This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c5fbe5b23d [timeseries] Make Time Buckets Half Open on the Left (#14413) c5fbe5b23d is described below commit c5fbe5b23d4ab7ac7819b7a9744eb9e3b9562f8c Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Mon Nov 11 15:24:47 2024 -0600 [timeseries] Make Time Buckets Half Open on the Left (#14413) --- .../timeseries/TimeSeriesAggregationOperator.java | 38 ++++----- .../TimeSeriesAggregationOperatorTest.java | 96 ++++++++++++++++++++++ .../core/query/executor/QueryExecutorTest.java | 6 +- .../query/service/dispatch/QueryDispatcher.java | 11 ++- .../PhysicalTimeSeriesPlanVisitorTest.java | 4 +- .../org/apache/pinot/tsdb/spi/TimeBuckets.java | 37 ++++++--- .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 12 ++- .../org/apache/pinot/tsdb/spi/TimeBucketsTest.java | 53 ++++++++++++ .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 12 +-- 9 files changed, 214 insertions(+), 55 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java index 25ee168ef8..fd895a4607 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.timeseries; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import java.time.Duration; import java.util.HashMap; @@ -51,7 +52,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION"; private final String _timeColumn; private final TimeUnit _storedTimeUnit; - private final Long _timeOffset; + private final long _timeOffset; private final AggInfo _aggInfo; private final ExpressionContext _valueExpression; private final List<String> _groupByExpressions; @@ -62,7 +63,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult public TimeSeriesAggregationOperator( String timeColumn, TimeUnit timeUnit, - Long timeOffsetSeconds, + @Nullable Long timeOffsetSeconds, AggInfo aggInfo, ExpressionContext valueExpression, List<String> groupByExpressions, @@ -71,7 +72,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult TimeSeriesBuilderFactory seriesBuilderFactory) { _timeColumn = timeColumn; _storedTimeUnit = timeUnit; - _timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds)); + _timeOffset = timeOffsetSeconds == null ? 0L : timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds)); _aggInfo = aggInfo; _valueExpression = valueExpression; _groupByExpressions = groupByExpressions; @@ -89,10 +90,8 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult // TODO: This is quite unoptimized and allocates liberally BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn); long[] timeValues = blockValSet.getLongValuesSV(); - if (_timeOffset != null && _timeOffset != 0L) { - timeValues = applyTimeshift(_timeOffset, timeValues, numDocs); - } - int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit, numDocs); + applyTimeOffset(timeValues, numDocs); + int[] timeValueIndexes = getTimeValueIndex(timeValues, numDocs); Object[][] tagValues = new Object[_groupByExpressions.size()][]; for (int i = 0; i < _groupByExpressions.size(); i++) { blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i)); @@ -152,23 +151,26 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult return new ExecutionStatistics(0, 0, 0, 0); } - private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit, int numDocs) { - if (timeUnit == TimeUnit.MILLISECONDS) { + @VisibleForTesting + protected int[] getTimeValueIndex(long[] actualTimeValues, int numDocs) { + if (_storedTimeUnit == TimeUnit.MILLISECONDS) { return getTimeValueIndexMillis(actualTimeValues, numDocs); } int[] timeIndexes = new int[numDocs]; + final long reference = _timeBuckets.getTimeRangeStartExclusive(); + final long divisor = _timeBuckets.getBucketSize().getSeconds(); for (int index = 0; index < numDocs; index++) { - timeIndexes[index] = (int) ((actualTimeValues[index] - _timeBuckets.getStartTime()) - / _timeBuckets.getBucketSize().getSeconds()); + timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / divisor); } return timeIndexes; } private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) { int[] timeIndexes = new int[numDocs]; + final long reference = _timeBuckets.getTimeRangeStartExclusive() * 1000L; + final long divisor = _timeBuckets.getBucketSize().toMillis(); for (int index = 0; index < numDocs; index++) { - timeIndexes[index] = (int) ((actualTimeValues[index] - _timeBuckets.getStartTime() * 1000L) - / _timeBuckets.getBucketSize().toMillis()); + timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / divisor); } return timeIndexes; } @@ -240,14 +242,12 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult } } - public static long[] applyTimeshift(long timeshift, long[] timeValues, int numDocs) { - if (timeshift == 0) { - return timeValues; + private void applyTimeOffset(long[] timeValues, int numDocs) { + if (_timeOffset == 0L) { + return; } - long[] shiftedTimeValues = new long[numDocs]; for (int index = 0; index < numDocs; index++) { - shiftedTimeValues[index] = timeValues[index] + timeshift; + timeValues[index] = timeValues[index] + _timeOffset; } - return shiftedTimeValues; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java new file mode 100644 index 0000000000..888d0658e9 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.timeseries; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + + +public class TimeSeriesAggregationOperatorTest { + private static final String DUMMY_TIME_COLUMN = "someTimeColumn"; + private static final AggInfo AGG_INFO = new AggInfo("", Collections.emptyMap()); + private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn"); + + @Test + public void testGetTimeValueIndexForSeconds() { + /* + * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900] + * storedTimeValues: [9_999, 10_000, 9_999, 9_901, 10_100, 10_899, 10_900] + * expected indexes: [0, 0, 0, 0, 1, 9, 9] + */ + final int[] expectedIndexes = new int[]{0, 0, 0, 0, 1, 9, 9}; + final TimeUnit storedTimeUnit = TimeUnit.SECONDS; + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(100), 10); + TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); + long[] storedTimeValues = new long[]{9_999L, 10_000L, 9_999L, 9_901L, 10_100L, 10_899L, 10_900L}; + int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); + assertEquals(indexes, expectedIndexes); + } + + @Test + public void testGetTimeValueIndexForMillis() { + /* + * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900] + * storedTimeValues: [9_999_000, 10_000_000, 10_500_000, 10_899_999, 10_800_001, 10_900_000] + * expected indexes: [0, 0, 5, 9, 9, 9] + */ + final int[] expectedIndexes = new int[]{0, 0, 5, 9, 9, 9}; + final TimeUnit storedTimeUnit = TimeUnit.MILLISECONDS; + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(100), 10); + TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); + long[] storedTimeValues = new long[]{9_999_000L, 10_000_000L, 10_500_000L, 10_899_999L, 10_800_001L, 10_900_000L}; + int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); + assertEquals(indexes, expectedIndexes); + } + + @Test + public void testGetTimeValueIndexOutOfBounds() { + final TimeUnit storedTimeUnit = TimeUnit.SECONDS; + final int numTimeBuckets = 10; + final int windowSeconds = 100; + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, Duration.ofSeconds(windowSeconds), numTimeBuckets); + TimeSeriesAggregationOperator aggregationOperator = buildOperator(storedTimeUnit, timeBuckets); + testOutOfBoundsTimeValueIndex(new long[]{8_000}, numTimeBuckets, aggregationOperator); + testOutOfBoundsTimeValueIndex(new long[]{timeBuckets.getTimeRangeEndInclusive() + 1}, numTimeBuckets, + aggregationOperator); + } + + private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int numTimeBuckets, + TimeSeriesAggregationOperator aggOperator) { + assertEquals(storedTimeValues.length, 1, "Misconfigured test: pass single stored time value"); + int[] indexes = aggOperator.getTimeValueIndex(storedTimeValues, storedTimeValues.length); + assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time index to spill beyond valid range"); + } + + private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, TimeBuckets timeBuckets) { + return new TimeSeriesAggregationOperator( + DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.emptyList(), + timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class)); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 457e916b6d..404bd3efc3 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -82,6 +82,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -217,7 +218,7 @@ public class QueryExecutorTest { @Test public void testTimeSeriesSumQuery() { - TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 1); + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 2); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, @@ -230,7 +231,8 @@ public class QueryExecutorTest { TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); - assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0], 29885544.0); + assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]); + assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1], 29885544.0); } @Test diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 1abe32d11b..fc10c96346 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -82,6 +82,7 @@ import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetada import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys; import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.TimeBuckets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -301,13 +302,11 @@ public class QueryDispatcher { Map<String, String> initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) { Map<String, String> result = new HashMap<>(); + TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets(); result.put(WorkerRequestMetadataKeys.LANGUAGE, dispatchablePlan.getLanguage()); - result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS, - Long.toString(dispatchablePlan.getTimeBuckets().getStartTime())); - result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, - Long.toString(dispatchablePlan.getTimeBuckets().getBucketSize().getSeconds())); - result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, - Long.toString(dispatchablePlan.getTimeBuckets().getTimeBuckets().length)); + result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS, Long.toString(timeBuckets.getTimeBuckets()[0])); + result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, Long.toString(timeBuckets.getBucketSize().getSeconds())); + result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, Long.toString(timeBuckets.getTimeBuckets().length)); for (Map.Entry<String, List<String>> entry : dispatchablePlan.getPlanIdToSegments().entrySet()) { result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), String.join(",", entry.getValue())); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java index 8f877aec01..c9a687c5ff 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java @@ -54,7 +54,7 @@ public class PhysicalTimeSeriesPlanVisitorTest { assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(), "orderCount"); assertEquals(queryContext.getFilter().toString(), - "(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime < '2000')"); + "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')"); } // Case-2: With offset, complex group-by expression, complex value, and non-empty filter { @@ -75,7 +75,7 @@ public class PhysicalTimeSeriesPlanVisitorTest { assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(), "times(orderCount,'2')"); assertNotNull(queryContext.getFilter()); assertEquals(queryContext.getFilter().toString(), - "(cityName = 'Chicago' AND orderTime >= '990' AND orderTime < '1990')"); + "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= '1980')"); } } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java index a92eb3bcc2..0dba0ec828 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java @@ -25,7 +25,7 @@ import java.util.Objects; /** * Time buckets used for query execution. Each element (say x) in the {@link #getTimeBuckets()} array represents a - * time-range which is half open on the right side: [x, x + bucketSize.getSeconds()). Some query languages allow some + * time-range which is half open on the left side: (x - bucketSize.getSeconds(), x]. Some query languages allow some * operators to mutate the time-buckets on the fly, so it is not guaranteed that the time resolution and/or range * will be the same across all operators. For instance, Uber's M3QL supports a "summarize 1h sum" operator which will * change the bucket resolution to 1 hour for all subsequent operators. @@ -47,16 +47,16 @@ public class TimeBuckets { return _bucketSize; } - public long getStartTime() { - return _timeBuckets[0]; + public long getTimeRangeStartExclusive() { + return _timeBuckets[0] - _bucketSize.getSeconds(); } - public long getEndTime() { + public long getTimeRangeEndInclusive() { return _timeBuckets[_timeBuckets.length - 1]; } public long getRangeSeconds() { - return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0] + _bucketSize.getSeconds(); + return getTimeRangeEndInclusive() - getTimeRangeStartExclusive(); } public int getNumBuckets() { @@ -67,13 +67,12 @@ public class TimeBuckets { if (_timeBuckets.length == 0) { return -1; } - if (timeValue < _timeBuckets[0]) { + if (timeValue <= getTimeRangeStartExclusive() || timeValue > getTimeRangeEndInclusive()) { return -1; } - if (timeValue >= _timeBuckets[_timeBuckets.length - 1] + _bucketSize.getSeconds()) { - return -1; - } - return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds()); + long offsetFromRangeStart = timeValue - getTimeRangeStartExclusive(); + // Subtract 1 from the offset because we have intervals half-open on the left. + return (int) ((offsetFromRangeStart - 1) / _bucketSize.getSeconds()); } @Override @@ -82,7 +81,8 @@ public class TimeBuckets { return false; } TimeBuckets other = (TimeBuckets) o; - return this.getStartTime() == other.getStartTime() && this.getEndTime() == other.getEndTime() + return this.getTimeRangeStartExclusive() == other.getTimeRangeStartExclusive() + && this.getTimeRangeEndInclusive() == other.getTimeRangeEndInclusive() && this.getBucketSize().equals(other.getBucketSize()); } @@ -93,11 +93,22 @@ public class TimeBuckets { return result; } - public static TimeBuckets ofSeconds(long startTimeSeconds, Duration bucketSize, int numElements) { + /** + * Creates time-buckets, with the first value in the bucket being firstBucketValue (FBV). The time range represented + * by the buckets are: + * <pre> + * (FBV - bucketSize.getSeconds(), FBV + (numElements - 1) * bucketSize.getSeconds()] + * </pre> + * The raw Long[] time values are: + * <pre> + * FBV, FBV + bucketSize.getSeconds(), ... , FBV + (numElements - 1) * bucketSize.getSeconds() + * </pre> + */ + public static TimeBuckets ofSeconds(long firstBucketValue, Duration bucketSize, int numElements) { long stepSize = bucketSize.getSeconds(); Long[] timeBuckets = new Long[numElements]; for (int i = 0; i < numElements; i++) { - timeBuckets[i] = startTimeSeconds + i * stepSize; + timeBuckets[i] = firstBucketValue + i * stepSize; } return new TimeBuckets(timeBuckets, bucketSize); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index 856b81622a..773d675242 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -115,14 +115,12 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { public String getEffectiveFilter(TimeBuckets timeBuckets) { String filter = _filterExpression == null ? "" : _filterExpression; - long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offsetSeconds)); - long endTime = - _timeUnit.convert(Duration.ofSeconds( - timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds() - _offsetSeconds)); - String addnFilter = String.format("%s >= %d AND %s < %d", _timeColumn, startTime, _timeColumn, endTime); + long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() - _offsetSeconds)); + long endTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeEndInclusive() - _offsetSeconds)); + String timeFilter = String.format("%s > %d AND %s <= %d", _timeColumn, startTime, _timeColumn, endTime); if (filter.strip().isEmpty()) { - return addnFilter; + return timeFilter; } - return String.format("(%s) AND (%s)", filter, addnFilter); + return String.format("(%s) AND (%s)", filter, timeFilter); } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java new file mode 100644 index 0000000000..27d4ce7543 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import java.time.Duration; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class TimeBucketsTest { + @Test + public void testTimeBucketsSemantics() { + /* + * time-bucket values: [10_000, 10_100, 10_200, ... , 10_900] + */ + final int firstBucketValue = 10_000; + final int bucketSize = 100; + final int numElements = 10; + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(firstBucketValue, Duration.ofSeconds(bucketSize), numElements); + assertEquals(timeBuckets.getNumBuckets(), numElements); + assertEquals(timeBuckets.getBucketSize().getSeconds(), bucketSize); + assertEquals(timeBuckets.getTimeRangeStartExclusive(), firstBucketValue - bucketSize); + assertEquals(timeBuckets.getTimeRangeEndInclusive(), firstBucketValue + (numElements - 1) * bucketSize); + assertEquals(timeBuckets.getRangeSeconds(), + timeBuckets.getTimeRangeEndInclusive() - timeBuckets.getTimeRangeStartExclusive()); + assertEquals(timeBuckets.resolveIndex(10_000), 0); + assertEquals(timeBuckets.resolveIndex(9_999), 0); + assertEquals(timeBuckets.resolveIndex(9_901), 0); + assertEquals(timeBuckets.resolveIndex(10_100), 1); + assertEquals(timeBuckets.resolveIndex(10_101), 2); + assertEquals(timeBuckets.resolveIndex(10_900), 9); + // Test out of bound indexes + assertEquals(timeBuckets.resolveIndex(9_900), -1); + assertEquals(timeBuckets.resolveIndex(10_901), -1); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java index f439bfc028..011cb6fbc6 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java @@ -37,8 +37,8 @@ public class LeafTimeSeriesPlanNodeTest { @Test public void testGetEffectiveFilter() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(13), 9); - final long expectedStartTimeInFilter = 1000; - final long expectedEndTimeInFilter = 1000 + 13 * 9; + final long expectedStartTimeInFilter = timeBuckets.getTimeRangeStartExclusive(); + final long expectedEndTimeInFilter = timeBuckets.getTimeRangeEndInclusive(); final String nonEmptyFilter = "cityName = 'Chicago'"; // Case-1: No offset, and empty filter. { @@ -46,7 +46,7 @@ public class LeafTimeSeriesPlanNodeTest { new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), - "orderTime >= " + expectedStartTimeInFilter + " AND orderTime < " + expectedEndTimeInFilter); + "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter); } // Case-2: Offset, but empty filter { @@ -54,7 +54,7 @@ public class LeafTimeSeriesPlanNodeTest { new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), - "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND orderTime < " + (expectedEndTimeInFilter - 123)); + "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123)); } // Case-3: Offset and non-empty filter { @@ -62,7 +62,7 @@ public class LeafTimeSeriesPlanNodeTest { new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), - String.format("(%s) AND (orderTime >= %s AND orderTime < %s)", nonEmptyFilter, + String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123))); } // Case-4: Offset, and non-empty filter, and time-unit that is not seconds @@ -71,7 +71,7 @@ public class LeafTimeSeriesPlanNodeTest { new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), - String.format("(%s) AND (orderTime >= %s AND orderTime < %s)", nonEmptyFilter, + String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000))); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org