This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ff66173 Deprecate the PostAggregationGapfill. (#8383) ff66173 is described below commit ff66173a27eda5d327eb06c669e76a6488c82045 Author: weixiangsun <91153405+weixiang...@users.noreply.github.com> AuthorDate: Tue Mar 22 11:34:13 2022 -0700 Deprecate the PostAggregationGapfill. (#8383) --- .../core/operator/transform/TransformOperator.java | 4 +- .../reduce/GapFillGroupByDataTableReducer.java | 491 ---------------- .../core/query/reduce/PostAggregationHandler.java | 4 +- .../core/query/reduce/ResultReducerFactory.java | 3 - .../org/apache/pinot/core/util/GapfillUtils.java | 30 +- .../apache/pinot/queries/GapfillQueriesTest.java | 1 - .../queries/PostAggregationGapfillQueriesTest.java | 616 --------------------- 7 files changed, 8 insertions(+), 1141 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java index 08bfeb8..d3b752e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java @@ -36,7 +36,6 @@ import org.apache.pinot.core.operator.blocks.TransformBlock; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.Dictionary; @@ -63,8 +62,7 @@ public class TransformOperator extends BaseOperator<TransformBlock> { _projectionOperator = projectionOperator; _dataSourceMap = projectionOperator.getDataSourceMap(); for (ExpressionContext expression : expressions) { - TransformFunction transformFunction = - TransformFunctionFactory.get(queryContext, GapfillUtils.stripGapfill(expression), _dataSourceMap); + TransformFunction transformFunction = TransformFunctionFactory.get(queryContext, expression, _dataSourceMap); _transformFunctionMap.put(expression, transformFunction); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java deleted file mode 100644 index 9f739b0..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java +++ /dev/null @@ -1,491 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.query.reduce; - -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.metrics.BrokerGauge; -import org.apache.pinot.common.metrics.BrokerMeter; -import org.apache.pinot.common.metrics.BrokerMetrics; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.FilterContext; -import org.apache.pinot.common.request.context.OrderByExpressionContext; -import org.apache.pinot.common.response.broker.BrokerResponseNative; -import org.apache.pinot.common.response.broker.QueryProcessingException; -import org.apache.pinot.common.response.broker.ResultTable; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.common.utils.DataTable; -import org.apache.pinot.core.data.table.ConcurrentIndexedTable; -import org.apache.pinot.core.data.table.IndexedTable; -import org.apache.pinot.core.data.table.Key; -import org.apache.pinot.core.data.table.Record; -import org.apache.pinot.core.data.table.SimpleIndexedTable; -import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; -import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.transport.ServerRoutingInstance; -import org.apache.pinot.core.util.GapfillUtils; -import org.apache.pinot.core.util.GroupByUtils; -import org.apache.pinot.core.util.trace.TraceCallable; -import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.data.DateTimeGranularitySpec; - - -/** - * Helper class to reduce data tables and set group by results into the BrokerResponseNative - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class GapFillGroupByDataTableReducer implements DataTableReducer { - private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value. - - private final QueryContext _queryContext; - private final AggregationFunction[] _aggregationFunctions; - private final int _numAggregationFunctions; - private final List<ExpressionContext> _groupByExpressions; - private final int _numGroupByExpressions; - private final int _numColumns; - private final DateTimeGranularitySpec _dateTimeGranularity; - private final DateTimeFormatSpec _dateTimeFormatter; - private final long _startMs; - private final long _endMs; - private final Set<Key> _groupByKeys; - private final Map<Key, Object[]> _previousByGroupKey; - private final int _numOfGroupByKeys; - private final List<Integer> _groupByKeyIndexes; - private final boolean [] _isGroupBySelections; - private int _timeBucketIndex = -1; - - GapFillGroupByDataTableReducer(QueryContext queryContext) { - Preconditions.checkArgument( - queryContext.getBrokerRequest().getPinotQuery() != null, "GapFill can only be applied to sql query"); - _queryContext = queryContext; - _aggregationFunctions = queryContext.getAggregationFunctions(); - assert _aggregationFunctions != null; - _numAggregationFunctions = _aggregationFunctions.length; - _groupByExpressions = queryContext.getGroupByExpressions(); - assert _groupByExpressions != null; - _numGroupByExpressions = _groupByExpressions.size(); - _numColumns = _numAggregationFunctions + _numGroupByExpressions; - - ExpressionContext gapFillSelection = null; - for (ExpressionContext expressionContext : _queryContext.getSelectExpressions()) { - if (GapfillUtils.isPostAggregateGapfill(expressionContext)) { - gapFillSelection = expressionContext; - break; - } - } - - List<ExpressionContext> args = gapFillSelection.getFunction().getArguments(); - Preconditions.checkArgument( - args.size() == 5, "PostAggregateGapFill does not have correct number of arguments."); - Preconditions.checkArgument( - args.get(1).getLiteral() != null, "The second argument of PostAggregateGapFill should be TimeFormatter."); - Preconditions.checkArgument( - args.get(2).getLiteral() != null, "The third argument of PostAggregateGapFill should be start time."); - Preconditions.checkArgument( - args.get(3).getLiteral() != null, "The fourth argument of PostAggregateGapFill should be end time."); - Preconditions.checkArgument( - args.get(4).getLiteral() != null, "The fifth argument of PostAggregateGapFill should be time bucket size."); - - boolean orderByTimeBucket = false; - if (_queryContext.getOrderByExpressions() != null && !_queryContext.getOrderByExpressions().isEmpty()) { - OrderByExpressionContext firstOrderByExpression = _queryContext.getOrderByExpressions().get(0); - orderByTimeBucket = - firstOrderByExpression.isAsc() && firstOrderByExpression.getExpression().equals(gapFillSelection); - } - - Preconditions.checkArgument( - orderByTimeBucket, "PostAggregateGapFill does not work if the time bucket is not ordered."); - - _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral()); - _dateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral()); - String start = args.get(2).getLiteral(); - String end = args.get(3).getLiteral(); - _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start)); - _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end)); - _groupByKeys = new HashSet<>(); - _previousByGroupKey = new HashMap<>(); - _numOfGroupByKeys = _queryContext.getGroupByExpressions().size() - 1; - _groupByKeyIndexes = new ArrayList<>(); - _isGroupBySelections = new boolean[_queryContext.getSelectExpressions().size()]; - - for (ExpressionContext expressionContext : _groupByExpressions) { - if (GapfillUtils.isPostAggregateGapfill(expressionContext)) { - for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) { - if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) { - _timeBucketIndex = i; - _isGroupBySelections[i] = true; - break; - } - } - } else { - for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) { - if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) { - _groupByKeyIndexes.add(i); - _isGroupBySelections[i] = true; - break; - } - } - } - } - - Preconditions.checkArgument(_timeBucketIndex >= 0, "There is no time bucket."); - } - - private long truncate(long epoch) { - int sz = _dateTimeGranularity.getSize(); - return epoch / sz * sz; - } - - /** - * Reduces and sets group by results into ResultTable, if responseFormat = sql - * By default, sets group by results into GroupByResults - */ - @Override - public void reduceAndSetResults(String tableName, DataSchema dataSchema, - Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative, - DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { - assert dataSchema != null; - Collection<DataTable> dataTables = dataTableMap.values(); - - try { - setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, reducerContext, tableName, - brokerMetrics); - } catch (TimeoutException e) { - brokerResponseNative.getProcessingExceptions() - .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage())); - } - int resultSize = brokerResponseNative.getResultTable().getRows().size(); - - if (brokerMetrics != null && resultSize > 0) { - brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, resultSize); - } - } - - private Key constructGroupKeys(Object[] row) { - Object [] groupKeys = new Object[_numOfGroupByKeys]; - for (int i = 0; i < _numOfGroupByKeys; i++) { - groupKeys[i] = row[_groupByKeyIndexes.get(i)]; - } - return new Key(groupKeys); - } - - /** - * Extract group by order by results and set into {@link ResultTable} - * @param brokerResponseNative broker response - * @param dataSchema data schema - * @param dataTables Collection of data tables - * @param reducerContext DataTableReducer context - * @param rawTableName table name - * @param brokerMetrics broker metrics (meters) - * @throws TimeoutException If unable complete within timeout. - */ - private void setSQLGroupByInResultTable(BrokerResponseNative brokerResponseNative, DataSchema dataSchema, - Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName, - BrokerMetrics brokerMetrics) - throws TimeoutException { - IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext); - if (brokerMetrics != null) { - brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes()); - brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs()); - } - DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema); - ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes(); - - PostAggregationHandler postAggregationHandler = - new PostAggregationHandler(_queryContext, prePostAggregationDataSchema); - DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema(); - ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes(); - Iterator<Record> sortedIterator = indexedTable.iterator(); - while (sortedIterator.hasNext()) { - Object[] row = sortedIterator.next().getValues(); - extractFinalAggregationResults(row); - for (int i = 0; i < columnDataTypes.length; i++) { - row[i] = columnDataTypes[i].convert(row[i]); - } - Object[] resultRow = postAggregationHandler.getResult(row); - for (int i = 0; i < resultColumnDataTypes.length; i++) { - resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]); - } - - _groupByKeys.add(constructGroupKeys(resultRow)); - } - - List<Object[]> gapfillResultRows = gapFill(indexedTable.iterator(), postAggregationHandler); - brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, gapfillResultRows)); - } - - List<Object[]> gapFill(Iterator<Record> sortedIterator, PostAggregationHandler postAggregationHandler) { - DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema(); - ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes(); - int limit = _queryContext.getLimit(); - int numResultColumns = resultColumnDataTypes.length; - List<Object[]> gapfillResultRows = new ArrayList<>(limit); - long step = _dateTimeGranularity.granularityToMillis(); - FilterContext havingFilter = _queryContext.getHavingFilter(); - HavingFilterHandler havingFilterHandler = null; - if (havingFilter != null) { - havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler); - } - Record record = null; - for (long time = _startMs; time + 2 * step <= _endMs; time += step) { - Set<Key> keys = new HashSet<>(_groupByKeys); - if (record == null && sortedIterator.hasNext()) { - record = sortedIterator.next(); - } - - while (record != null) { - Object[] row = record.getValues(); - - Object[] resultRow = postAggregationHandler.getResult(row); - for (int i = 0; i < resultColumnDataTypes.length; i++) { - resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]); - } - - long timeCol = _dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[_timeBucketIndex])); - if (timeCol > time) { - break; - } - if (timeCol == time) { - if (havingFilterHandler == null || havingFilterHandler.isMatch(row)) { - gapfillResultRows.add(resultRow); - if (gapfillResultRows.size() == limit) { - return gapfillResultRows; - } - } - Key key = constructGroupKeys(resultRow); - keys.remove(key); - _previousByGroupKey.put(key, resultRow); - } - if (sortedIterator.hasNext()) { - record = sortedIterator.next(); - } else { - record = null; - } - } - - for (Key key : keys) { - Object[] gapfillRow = new Object[numResultColumns]; - int keyIndex = 0; - for (int i = 0; i < _isGroupBySelections.length; i++) { - if (_isGroupBySelections[i]) { - if (i == _timeBucketIndex) { - if (resultColumnDataTypes[i] == ColumnDataType.LONG) { - gapfillRow[_timeBucketIndex] = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time)); - } else { - gapfillRow[_timeBucketIndex] = _dateTimeFormatter.fromMillisToFormat(time); - } - } else { - gapfillRow[i] = key.getValues()[keyIndex++]; - } - } else { - gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]); - } - } - - if (havingFilterHandler == null || havingFilterHandler.isMatch(gapfillRow)) { - gapfillResultRows.add(gapfillRow); - if (gapfillResultRows.size() == limit) { - return gapfillResultRows; - } - } - } - } - return gapfillResultRows; - } - - Object getFillValue(int columIndex, Object key, ColumnDataType dataType) { - ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(columIndex); - if (expressionContext.getFunction() != null && GapfillUtils.isFill(expressionContext)) { - List<ExpressionContext> args = expressionContext.getFunction().getArguments(); - if (args.get(1).getLiteral() == null) { - throw new UnsupportedOperationException("Wrong Sql."); - } - GapfillUtils.FillType fillType = GapfillUtils.FillType.valueOf(args.get(1).getLiteral()); - if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) { - // TODO: may fill the default value from sql in the future. - return GapfillUtils.getDefaultValue(dataType); - } else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) { - Object[] row = _previousByGroupKey.get(key); - if (row != null) { - return row[columIndex]; - } else { - return GapfillUtils.getDefaultValue(dataType); - } - } else { - throw new UnsupportedOperationException("unsupported fill type."); - } - } else { - return GapfillUtils.getDefaultValue(dataType); - } - } - - /** - * Helper method to extract the final aggregation results for the given row (in-place). - */ - private void extractFinalAggregationResults(Object[] row) { - for (int i = 0; i < _numAggregationFunctions; i++) { - int valueIndex = i + _numGroupByExpressions; - row[valueIndex] = _aggregationFunctions[i].extractFinalResult(row[valueIndex]); - } - } - - /** - * Constructs the DataSchema for the rows before the post-aggregation (SQL mode). - */ - private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) { - String[] columnNames = dataSchema.getColumnNames(); - ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns]; - System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0, _numGroupByExpressions); - for (int i = 0; i < _numAggregationFunctions; i++) { - columnDataTypes[i + _numGroupByExpressions] = _aggregationFunctions[i].getFinalResultColumnType(); - } - return new DataSchema(columnNames, columnDataTypes); - } - - private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce, - DataTableReducerContext reducerContext) - throws TimeoutException { - long start = System.currentTimeMillis(); - int numDataTables = dataTablesToReduce.size(); - - // Get the number of threads to use for reducing. - // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls. - int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery()); - int limit = _queryContext.getLimit(); - // TODO: Make minTrimSize configurable - int trimSize = GroupByUtils.getTableCapacity(limit); - // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy. - // TODO: Resolve the HAVING clause within the IndexedTable before returning the result - int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit; - int trimThreshold = reducerContext.getGroupByTrimThreshold(); - IndexedTable indexedTable; - if (numReduceThreadsToUse <= 1) { - indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold); - } else { - if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) { - // special case of trim threshold where it is set to max value. - // there won't be any trimming during upsert in this case. - // thus we can avoid the overhead of read-lock and write-lock - // in the upsert method. - indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, resultSize); - } else { - indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold); - } - } - - // Create groups of data tables that each thread can process concurrently. - // Given that numReduceThreads is <= numDataTables, each group will have at least one data table. - ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce); - List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse); - - for (int i = 0; i < numReduceThreadsToUse; i++) { - reduceGroups.add(new ArrayList<>()); - } - for (int i = 0; i < numDataTables; i++) { - reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i)); - } - - ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes(); - long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start); - try { - reducerContext.getExecutorService().invokeAll(reduceGroups.stream() - .map(reduceGroup -> new TraceCallable<Void>() { - @Override - public Void callJob() throws Exception { - for (DataTable dataTable : reduceGroup) { - int numRows = dataTable.getNumberOfRows(); - for (int rowId = 0; rowId < numRows; rowId++) { - Object[] values = new Object[_numColumns]; - for (int colId = 0; colId < _numColumns; colId++) { - switch (storedColumnDataTypes[colId]) { - case INT: - values[colId] = dataTable.getInt(rowId, colId); - break; - case LONG: - values[colId] = dataTable.getLong(rowId, colId); - break; - case FLOAT: - values[colId] = dataTable.getFloat(rowId, colId); - break; - case DOUBLE: - values[colId] = dataTable.getDouble(rowId, colId); - break; - case STRING: - values[colId] = dataTable.getString(rowId, colId); - break; - case BYTES: - values[colId] = dataTable.getBytes(rowId, colId); - break; - case OBJECT: - values[colId] = dataTable.getObject(rowId, colId); - break; - // Add other aggregation intermediate result / group-by column type supports here - default: - throw new IllegalStateException(); - } - } - indexedTable.upsert(new Record(values)); - } - } - return null; - } - }).collect(Collectors.toList()), timeOutMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new TimeoutException("Timed out in broker reduce phase."); - } - - indexedTable.finish(true); - return indexedTable; - } - - /** - * Computes the number of reduce threads to use per query. - * <ul> - * <li> Use single thread if number of data tables to reduce is less than - * {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li> - * <li> Else, use min of max allowed reduce threads per query, and number of data tables.</li> - * </ul> - * - * @param numDataTables Number of data tables to reduce - * @param maxReduceThreadsPerQuery Max allowed reduce threads per query - * @return Number of reduce threads to use for the query - */ - private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) { - // Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE. - if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) { - return Math.min(1, numDataTables); // Number of data tables can be zero. - } - - return Math.min(maxReduceThreadsPerQuery, numDataTables); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java index 7fb878c..4c4bcaf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java @@ -35,7 +35,6 @@ import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor; import org.apache.pinot.core.query.reduce.filter.ValueExtractor; import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.util.GapfillUtils; /** @@ -58,7 +57,7 @@ public class PostAggregationHandler implements ValueExtractorFactory { _numGroupByExpressions = groupByExpressions.size(); _groupByExpressionIndexMap = new HashMap<>(); for (int i = 0; i < _numGroupByExpressions; i++) { - _groupByExpressionIndexMap.put(GapfillUtils.stripGapfill(groupByExpressions.get(i)), i); + _groupByExpressionIndexMap.put(groupByExpressions.get(i), i); } } else { _numGroupByExpressions = 0; @@ -107,7 +106,6 @@ public class PostAggregationHandler implements ValueExtractorFactory { */ @Override public ValueExtractor getValueExtractor(ExpressionContext expression) { - expression = GapfillUtils.stripGapfill(expression); if (expression.getType() == ExpressionContext.Type.LITERAL) { // Literal return new LiteralValueExtractor(expression.getLiteral()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java index a48a007..3631a3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java @@ -23,7 +23,6 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; -import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -58,8 +57,6 @@ public final class ResultReducerFactory { } else { return new AggregationDataTableReducer(queryContext); } - } else if (GapfillUtils.isPostAggregateGapfill(queryContext)) { - return new GapFillGroupByDataTableReducer(queryContext); } else { // Aggregation group-by query return new GroupByDataTableReducer(queryContext); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java index c40837e..988f780 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java @@ -39,7 +39,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; * Util class to encapsulate all utilites required for gapfill. */ public class GapfillUtils { - private static final String POST_AGGREGATE_GAP_FILL = "postaggregategapfill"; private static final String GAP_FILL = "gapfill"; private static final String AS = "as"; private static final String FILL = "fill"; @@ -56,29 +55,12 @@ public class GapfillUtils { FunctionContext function = expression.getFunction(); String functionName = function.getFunctionName(); - if (functionName.equals(POST_AGGREGATE_GAP_FILL) || functionName.equals(FILL) || functionName.equals(GAP_FILL)) { + if (functionName.equals(FILL) || functionName.equals(GAP_FILL)) { return function.getArguments().get(0); } return expression; } - public static boolean isPostAggregateGapfill(ExpressionContext expressionContext) { - if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) { - return false; - } - - return POST_AGGREGATE_GAP_FILL.equals(expressionContext.getFunction().getFunctionName()); - } - - public static boolean isPostAggregateGapfill(QueryContext queryContext) { - for (ExpressionContext expressionContext : queryContext.getSelectExpressions()) { - if (isPostAggregateGapfill(expressionContext)) { - return true; - } - } - return false; - } - public static boolean isFill(ExpressionContext expressionContext) { if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) { return false; @@ -189,15 +171,15 @@ public class GapfillUtils { Preconditions.checkArgument(gapFillSelection != null && gapFillSelection.getFunction() != null, "Gapfill Expression should be function."); List<ExpressionContext> args = gapFillSelection.getFunction().getArguments(); - Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not have correct number of arguments."); + Preconditions.checkArgument(args.size() > 5, "Gapfill does not have correct number of arguments."); Preconditions.checkArgument(args.get(1).getLiteral() != null, - "The second argument of PostAggregateGapFill should be TimeFormatter."); + "The second argument of Gapfill should be TimeFormatter."); Preconditions.checkArgument(args.get(2).getLiteral() != null, - "The third argument of PostAggregateGapFill should be start time."); + "The third argument of Gapfill should be start time."); Preconditions.checkArgument(args.get(3).getLiteral() != null, - "The fourth argument of PostAggregateGapFill should be end time."); + "The fourth argument of Gapfill should be end time."); Preconditions.checkArgument(args.get(4).getLiteral() != null, - "The fifth argument of PostAggregateGapFill should be time bucket size."); + "The fifth argument of Gapfill should be time bucket size."); ExpressionContext timeseriesOn = GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection); Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn expressions should be specified."); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java index bf0a28b..829a55d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java @@ -51,7 +51,6 @@ import org.testng.annotations.Test; * Queries test for Gapfill queries. */ // TODO: Item 1. table alias for subquery in next PR -// TODO: Item 2. Deprecate PostAggregateGapfill implementation in next PR @SuppressWarnings("rawtypes") public class GapfillQueriesTest extends BaseQueriesTest { private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PostAggregationGapfillQueriesTest"); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java deleted file mode 100644 index bfd7baa..0000000 --- a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java +++ /dev/null @@ -1,616 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.queries; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.response.broker.BrokerResponseNative; -import org.apache.pinot.common.response.broker.ResultTable; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; -import org.apache.pinot.segment.spi.ImmutableSegment; -import org.apache.pinot.segment.spi.IndexSegment; -import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.data.DateTimeGranularitySpec; -import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.utils.ReadMode; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -/** - * Queries test for PostAggregationGapfill queries. - */ -@SuppressWarnings("rawtypes") -public class PostAggregationGapfillQueriesTest extends BaseQueriesTest { - private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PostAggregationGapfillQueriesTest"); - private static final String RAW_TABLE_NAME = "parkingData"; - private static final String SEGMENT_NAME = "testSegment"; - private static final Random RANDOM = new Random(); - - private static final int NUM_LOTS = 4; - - private static final String IS_OCCUPIED_COLUMN = "isOccupied"; - private static final String LOT_ID_COLUMN = "lotId"; - private static final String EVENT_TIME_COLUMN = "eventTime"; - private static final Schema SCHEMA = new Schema.SchemaBuilder() - .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.BOOLEAN) - .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING) - .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG) - .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN)) - .build(); - private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) - .build(); - - private IndexSegment _indexSegment; - private List<IndexSegment> _indexSegments; - - @Override - protected String getFilter() { - // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator - return " WHERE eventTime >= 0"; - } - - @Override - protected IndexSegment getIndexSegment() { - return _indexSegment; - } - - @Override - protected List<IndexSegment> getIndexSegments() { - return _indexSegments; - } - - @BeforeClass - public void setUp() - throws Exception { - FileUtils.deleteDirectory(INDEX_DIR); - - long current = 1636286400000L; //November 7, 2021 12:00:00 PM - int duplicates = 16; - int interval = 1000 * 900; // 15 minutes - long start = current - duplicates * 2 * interval; //November 7, 2021 4:00:00 AM - - List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2); - for (int i = 0; i < NUM_LOTS; i++) { - for (int j = 0; j < duplicates; j++) { - if (j == 4 || j == 5 || j == 6 || j == 7 || j == 10 || j == 11) { - continue; - } - long parkingTime = start + interval * 2 * j + RANDOM.nextInt(interval); - long departingTime = j == 3 ? start + interval * (2 * j + 6) + RANDOM.nextInt(interval) : start - + interval * (2 * j + 1) + RANDOM.nextInt(interval); - - GenericRow parkingRow = new GenericRow(); - parkingRow.putValue(EVENT_TIME_COLUMN, parkingTime); - parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i)); - parkingRow.putValue(IS_OCCUPIED_COLUMN, true); - records.add(parkingRow); - - GenericRow departingRow = new GenericRow(); - departingRow.putValue(EVENT_TIME_COLUMN, departingTime); - departingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i)); - departingRow.putValue(IS_OCCUPIED_COLUMN, false); - records.add(departingRow); - } - } - - SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); - segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); - segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); - segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); - - SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); - driver.build(); - - ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); - _indexSegment = immutableSegment; - _indexSegments = Arrays.asList(immutableSegment); - } - - @Test - public void datetimeconvertGapfillTest() { - String dataTimeConvertQuery = "SELECT " - + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); - - ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); - - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " - + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter - = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000"); - for (int i = 0; i < 32; i += 4) { - String firstTimeCol = (String) gapFillRows.get(i)[0]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[1]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); - lots.add((String) gapFillRows.get(i + j)[1]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @Test - public void toEpochHoursGapfillTest() { - String dataTimeConvertQuery = "SELECT " - + "ToEpochHours(eventTime) AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); - - ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); - - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', " - + "'454515', '454524', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("454515"); - for (int i = 0; i < 32; i += 4) { - Long firstTimeCol = (Long) gapFillRows.get(i)[0]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[1]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); - lots.add((String) gapFillRows.get(i + j)[1]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @Test - public void toEpochMinutesRoundedHoursGapfillTest() { - String dataTimeConvertQuery = "SELECT " - + "ToEpochMinutesRounded(eventTime, 60) AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); - - ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); - - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(ToEpochMinutesRounded(eventTime, 60), '1:HOURS:EPOCH', " - + "'454515', '454524', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("454515"); - for (int i = 0; i < 32; i += 4) { - Long firstTimeCol = (Long) gapFillRows.get(i)[0]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[1]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); - lots.add((String) gapFillRows.get(i + j)[1]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @Test - public void toEpochMinutesBucketHoursGapfillTest() { - String dataTimeConvertQuery = "SELECT " - + "ToEpochMinutesBucket(eventTime, 60) AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); - - ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); - - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(ToEpochMinutesBucket(eventTime, 60), '1:HOURS:EPOCH', " - + "'454515', '454524', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("454515"); - for (int i = 0; i < 32; i += 4) { - Long firstTimeCol = (Long) gapFillRows.get(i)[0]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[1]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); - lots.add((String) gapFillRows.get(i + j)[1]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @Test - public void dateTruncHoursGapfillTest() { - String dataTimeConvertQuery = "SELECT " - + "DATETRUNC('hour', eventTime, 'milliseconds') AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); - - ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); - - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(DATETRUNC('hour', eventTime, 'milliseconds'), '1:HOURS:EPOCH', " - + "'454515', '454524', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("454515"); - for (int i = 0; i < 32; i += 4) { - Long firstTimeCol = (Long) gapFillRows.get(i)[0]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[1]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); - lots.add((String) gapFillRows.get(i + j)[1]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @Test - public void datetimeconvertGapfillTestWithoutTimeBucketOrdering() { - try { - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " - + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "LIMIT 200"; - - getBrokerResponseForSqlQuery(gapfillQuery); - Assert.fail(); - } catch (IllegalArgumentException e) { - Assert.assertEquals(e.getMessage(), "PostAggregateGapFill does not work if the time bucket is not ordered."); - } - } - - @Test - public void datetimeconvertGapfillTestWithHavingClause() { - String dataTimeConvertQueryWithUnoccupied = "SELECT " - + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "HAVING status = 'false' " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponseWithUnoccupied - = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithUnoccupied); - - ResultTable dateTimeConvertResultTableWithUnoccupied = dateTimeConvertBrokerResponseWithUnoccupied.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTableWithUnoccupied.getRows().size(), 20); - - String dataTimeConvertQueryWithOccupied = "SELECT " - + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, " - + "lotId, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "HAVING status = 'true' " - + "ORDER BY 1 " - + "LIMIT 200"; - - BrokerResponseNative dateTimeConvertBrokerResponseWithOccupied - = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithOccupied); - - ResultTable dateTimeConvertResultTableWithOccupied = dateTimeConvertBrokerResponseWithOccupied.getResultTable(); - Assert.assertEquals(dateTimeConvertResultTableWithOccupied.getRows().size(), 4); - - String gapfillQueryWithOccupied = "SELECT " - + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " - + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "HAVING status = 'true' " - + "ORDER BY 1 " - + "LIMIT 7"; - - BrokerResponseNative gapfillBrokerResponseWithOccupied = getBrokerResponseForSqlQuery(gapfillQueryWithOccupied); - - ResultTable gapFillResultTableWithOccupied = gapfillBrokerResponseWithOccupied.getResultTable(); - Assert.assertEquals(gapFillResultTableWithOccupied.getRows().size(), 7); - - for (Object [] row : gapFillResultTableWithOccupied.getRows()) { - Assert.assertEquals(row[2], true); - } - - String gapfillQueryWithUnoccupied = "SELECT " - + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " - + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "HAVING status = 'false' " - + "ORDER BY 1 " - + "LIMIT 24"; - - BrokerResponseNative gapfillBrokerResponseWithUnoccupied = getBrokerResponseForSqlQuery(gapfillQueryWithUnoccupied); - - ResultTable gapFillResultTableWithUnoccupied = gapfillBrokerResponseWithUnoccupied.getResultTable(); - Assert.assertEquals(gapFillResultTableWithUnoccupied.getRows().size(), 24); - for (Object [] row : gapFillResultTableWithUnoccupied.getRows()) { - Assert.assertEquals(row[2], false); - } - } - - - @Test - public void datetimeconvertGapfillTestTimeBucketAsLastSelection() { - String gapfillQuery = "SELECT " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3, " - + "lotId, PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " - + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 4, 5 " - + "ORDER BY 5 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter - = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000"); - for (int i = 0; i < 32; i += 4) { - String timeCol = (String) gapFillRows.get(i)[4]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(timeCol); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[3]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[4], gapFillRows.get(i + j)[4]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[3])); - lots.add((String) gapFillRows.get(i + j)[3]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @Test - public void datetimeconvertGapfillWithOrderingByTwoColumnsTest() { - String gapfillQuery = "SELECT " - + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " - + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " - + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " - + "lotId, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " - + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " - + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " - + "FROM parkingData " - + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " - + "GROUP BY 1, 2 " - + "ORDER BY 1, 2 " - + "LIMIT 200"; - - DateTimeFormatSpec dateTimeFormatter - = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS"); - DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); - - BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); - - ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); - Assert.assertEquals(gapFillResultTable.getRows().size(), 32); - List<Object[]> gapFillRows = gapFillResultTable.getRows(); - long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000"); - for (int i = 0; i < 32; i += 4) { - String firstTimeCol = (String) gapFillRows.get(i)[0]; - long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol); - Assert.assertEquals(timeStamp, start); - Set<String> lots = new HashSet<>(); - lots.add((String) gapFillRows.get(i)[1]); - for (int j = 1; j < 4; j++) { - Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); - Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); - lots.add((String) gapFillRows.get(i + j)[1]); - } - start += dateTimeGranularity.granularityToMillis(); - } - } - - @AfterClass - public void tearDown() - throws IOException { - _indexSegment.destroy(); - FileUtils.deleteDirectory(INDEX_DIR); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org