This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c5fbe5b23d [timeseries] Make Time Buckets Half Open on the Left 
(#14413)
c5fbe5b23d is described below

commit c5fbe5b23d4ab7ac7819b7a9744eb9e3b9562f8c
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Mon Nov 11 15:24:47 2024 -0600

    [timeseries] Make Time Buckets Half Open on the Left (#14413)
---
 .../timeseries/TimeSeriesAggregationOperator.java  | 38 ++++-----
 .../TimeSeriesAggregationOperatorTest.java         | 96 ++++++++++++++++++++++
 .../core/query/executor/QueryExecutorTest.java     |  6 +-
 .../query/service/dispatch/QueryDispatcher.java    | 11 ++-
 .../PhysicalTimeSeriesPlanVisitorTest.java         |  4 +-
 .../org/apache/pinot/tsdb/spi/TimeBuckets.java     | 37 ++++++---
 .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java      | 12 ++-
 .../org/apache/pinot/tsdb/spi/TimeBucketsTest.java | 53 ++++++++++++
 .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java  | 12 +--
 9 files changed, 214 insertions(+), 55 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index 25ee168ef8..fd895a4607 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.timeseries;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import java.time.Duration;
 import java.util.HashMap;
@@ -51,7 +52,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION";
   private final String _timeColumn;
   private final TimeUnit _storedTimeUnit;
-  private final Long _timeOffset;
+  private final long _timeOffset;
   private final AggInfo _aggInfo;
   private final ExpressionContext _valueExpression;
   private final List<String> _groupByExpressions;
@@ -62,7 +63,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   public TimeSeriesAggregationOperator(
       String timeColumn,
       TimeUnit timeUnit,
-      Long timeOffsetSeconds,
+      @Nullable Long timeOffsetSeconds,
       AggInfo aggInfo,
       ExpressionContext valueExpression,
       List<String> groupByExpressions,
@@ -71,7 +72,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
       TimeSeriesBuilderFactory seriesBuilderFactory) {
     _timeColumn = timeColumn;
     _storedTimeUnit = timeUnit;
-    _timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
+    _timeOffset = timeOffsetSeconds == null ? 0L : 
timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
     _aggInfo = aggInfo;
     _valueExpression = valueExpression;
     _groupByExpressions = groupByExpressions;
@@ -89,10 +90,8 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
       // TODO: This is quite unoptimized and allocates liberally
       BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
       long[] timeValues = blockValSet.getLongValuesSV();
-      if (_timeOffset != null && _timeOffset != 0L) {
-        timeValues = applyTimeshift(_timeOffset, timeValues, numDocs);
-      }
-      int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit, 
numDocs);
+      applyTimeOffset(timeValues, numDocs);
+      int[] timeValueIndexes = getTimeValueIndex(timeValues, numDocs);
       Object[][] tagValues = new Object[_groupByExpressions.size()][];
       for (int i = 0; i < _groupByExpressions.size(); i++) {
         blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i));
@@ -152,23 +151,26 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     return new ExecutionStatistics(0, 0, 0, 0);
   }
 
-  private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit, 
int numDocs) {
-    if (timeUnit == TimeUnit.MILLISECONDS) {
+  @VisibleForTesting
+  protected int[] getTimeValueIndex(long[] actualTimeValues, int numDocs) {
+    if (_storedTimeUnit == TimeUnit.MILLISECONDS) {
       return getTimeValueIndexMillis(actualTimeValues, numDocs);
     }
     int[] timeIndexes = new int[numDocs];
+    final long reference = _timeBuckets.getTimeRangeStartExclusive();
+    final long divisor = _timeBuckets.getBucketSize().getSeconds();
     for (int index = 0; index < numDocs; index++) {
-      timeIndexes[index] = (int) ((actualTimeValues[index] - 
_timeBuckets.getStartTime())
-          / _timeBuckets.getBucketSize().getSeconds());
+      timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / 
divisor);
     }
     return timeIndexes;
   }
 
   private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) {
     int[] timeIndexes = new int[numDocs];
+    final long reference = _timeBuckets.getTimeRangeStartExclusive() * 1000L;
+    final long divisor = _timeBuckets.getBucketSize().toMillis();
     for (int index = 0; index < numDocs; index++) {
-      timeIndexes[index] = (int) ((actualTimeValues[index] - 
_timeBuckets.getStartTime() * 1000L)
-          / _timeBuckets.getBucketSize().toMillis());
+      timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) / 
divisor);
     }
     return timeIndexes;
   }
@@ -240,14 +242,12 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     }
   }
 
-  public static long[] applyTimeshift(long timeshift, long[] timeValues, int 
numDocs) {
-    if (timeshift == 0) {
-      return timeValues;
+  private void applyTimeOffset(long[] timeValues, int numDocs) {
+    if (_timeOffset == 0L) {
+      return;
     }
-    long[] shiftedTimeValues = new long[numDocs];
     for (int index = 0; index < numDocs; index++) {
-      shiftedTimeValues[index] = timeValues[index] + timeshift;
+      timeValues[index] = timeValues[index] + _timeOffset;
     }
-    return shiftedTimeValues;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
new file mode 100644
index 0000000000..888d0658e9
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.timeseries;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesAggregationOperatorTest {
+  private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
+  private static final AggInfo AGG_INFO = new AggInfo("", 
Collections.emptyMap());
+  private static final ExpressionContext VALUE_EXPRESSION = 
ExpressionContext.forIdentifier("someValueColumn");
+
+  @Test
+  public void testGetTimeValueIndexForSeconds() {
+    /*
+     * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900]
+     * storedTimeValues: [9_999, 10_000, 9_999, 9_901, 10_100, 10_899, 10_900]
+     * expected indexes: [0, 0, 0, 0, 1, 9, 9]
+     */
+    final int[] expectedIndexes = new int[]{0, 0, 0, 0, 1, 9, 9};
+    final TimeUnit storedTimeUnit = TimeUnit.SECONDS;
+    TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, 
Duration.ofSeconds(100), 10);
+    TimeSeriesAggregationOperator aggregationOperator = 
buildOperator(storedTimeUnit, timeBuckets);
+    long[] storedTimeValues = new long[]{9_999L, 10_000L, 9_999L, 9_901L, 
10_100L, 10_899L, 10_900L};
+    int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, 
storedTimeValues.length);
+    assertEquals(indexes, expectedIndexes);
+  }
+
+  @Test
+  public void testGetTimeValueIndexForMillis() {
+    /*
+     * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900]
+     * storedTimeValues: [9_999_000, 10_000_000, 10_500_000, 10_899_999, 
10_800_001, 10_900_000]
+     * expected indexes: [0, 0, 5, 9, 9, 9]
+     */
+    final int[] expectedIndexes = new int[]{0, 0, 5, 9, 9, 9};
+    final TimeUnit storedTimeUnit = TimeUnit.MILLISECONDS;
+    TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, 
Duration.ofSeconds(100), 10);
+    TimeSeriesAggregationOperator aggregationOperator = 
buildOperator(storedTimeUnit, timeBuckets);
+    long[] storedTimeValues = new long[]{9_999_000L, 10_000_000L, 10_500_000L, 
10_899_999L, 10_800_001L, 10_900_000L};
+    int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues, 
storedTimeValues.length);
+    assertEquals(indexes, expectedIndexes);
+  }
+
+  @Test
+  public void testGetTimeValueIndexOutOfBounds() {
+    final TimeUnit storedTimeUnit = TimeUnit.SECONDS;
+    final int numTimeBuckets = 10;
+    final int windowSeconds = 100;
+    TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000, 
Duration.ofSeconds(windowSeconds), numTimeBuckets);
+    TimeSeriesAggregationOperator aggregationOperator = 
buildOperator(storedTimeUnit, timeBuckets);
+    testOutOfBoundsTimeValueIndex(new long[]{8_000}, numTimeBuckets, 
aggregationOperator);
+    testOutOfBoundsTimeValueIndex(new 
long[]{timeBuckets.getTimeRangeEndInclusive() + 1}, numTimeBuckets,
+        aggregationOperator);
+  }
+
+  private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int 
numTimeBuckets,
+      TimeSeriesAggregationOperator aggOperator) {
+    assertEquals(storedTimeValues.length, 1, "Misconfigured test: pass single 
stored time value");
+    int[] indexes = aggOperator.getTimeValueIndex(storedTimeValues, 
storedTimeValues.length);
+    assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time 
index to spill beyond valid range");
+  }
+
+  private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, 
TimeBuckets timeBuckets) {
+    return new TimeSeriesAggregationOperator(
+        DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, 
Collections.emptyList(),
+        timeBuckets, mock(BaseProjectOperator.class), 
mock(TimeSeriesBuilderFactory.class));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 457e916b6d..404bd3efc3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -82,6 +82,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -217,7 +218,7 @@ public class QueryExecutorTest {
 
   @Test
   public void testTimeSeriesSumQuery() {
-    TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 1);
+    TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 2);
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderAmount");
     TimeSeriesContext timeSeriesContext =
         new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, 
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
@@ -230,7 +231,8 @@ public class QueryExecutorTest {
     TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
instanceResponse.getResultsBlock();
     TimeSeriesBlock timeSeriesBlock = 
resultsBlock.getTimeSeriesBuilderBlock().build();
     assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
-    
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0],
 29885544.0);
+    
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]);
+    
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1],
 29885544.0);
   }
 
   @Test
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 1abe32d11b..fc10c96346 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -82,6 +82,7 @@ import 
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetada
 import 
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
 import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
 import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -301,13 +302,11 @@ public class QueryDispatcher {
 
   Map<String, String> 
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) {
     Map<String, String> result = new HashMap<>();
+    TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
     result.put(WorkerRequestMetadataKeys.LANGUAGE, 
dispatchablePlan.getLanguage());
-    result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS,
-        Long.toString(dispatchablePlan.getTimeBuckets().getStartTime()));
-    result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS,
-        
Long.toString(dispatchablePlan.getTimeBuckets().getBucketSize().getSeconds()));
-    result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS,
-        
Long.toString(dispatchablePlan.getTimeBuckets().getTimeBuckets().length));
+    result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS, 
Long.toString(timeBuckets.getTimeBuckets()[0]));
+    result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, 
Long.toString(timeBuckets.getBucketSize().getSeconds()));
+    result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, 
Long.toString(timeBuckets.getTimeBuckets().length));
     for (Map.Entry<String, List<String>> entry : 
dispatchablePlan.getPlanIdToSegments().entrySet()) {
       
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), 
String.join(",", entry.getValue()));
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
index 8f877aec01..c9a687c5ff 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
@@ -54,7 +54,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
       assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), 
timeColumn);
       
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(),
 "orderCount");
       assertEquals(queryContext.getFilter().toString(),
-          "(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime < 
'2000')");
+          "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= 
'1990')");
     }
     // Case-2: With offset, complex group-by expression, complex value, and 
non-empty filter
     {
@@ -75,7 +75,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
       
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(),
 "times(orderCount,'2')");
       assertNotNull(queryContext.getFilter());
       assertEquals(queryContext.getFilter().toString(),
-          "(cityName = 'Chicago' AND orderTime >= '990' AND orderTime < 
'1990')");
+          "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= 
'1980')");
     }
   }
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
index a92eb3bcc2..0dba0ec828 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
@@ -25,7 +25,7 @@ import java.util.Objects;
 
 /**
  * Time buckets used for query execution. Each element (say x) in the {@link 
#getTimeBuckets()} array represents a
- * time-range which is half open on the right side: [x, x + 
bucketSize.getSeconds()). Some query languages allow some
+ * time-range which is half open on the left side: (x - 
bucketSize.getSeconds(), x]. Some query languages allow some
  * operators to mutate the time-buckets on the fly, so it is not guaranteed 
that the time resolution and/or range
  * will be the same across all operators. For instance, Uber's M3QL supports a 
"summarize 1h sum" operator which will
  * change the bucket resolution to 1 hour for all subsequent operators.
@@ -47,16 +47,16 @@ public class TimeBuckets {
     return _bucketSize;
   }
 
-  public long getStartTime() {
-    return _timeBuckets[0];
+  public long getTimeRangeStartExclusive() {
+    return _timeBuckets[0] - _bucketSize.getSeconds();
   }
 
-  public long getEndTime() {
+  public long getTimeRangeEndInclusive() {
     return _timeBuckets[_timeBuckets.length - 1];
   }
 
   public long getRangeSeconds() {
-    return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0] + 
_bucketSize.getSeconds();
+    return getTimeRangeEndInclusive() - getTimeRangeStartExclusive();
   }
 
   public int getNumBuckets() {
@@ -67,13 +67,12 @@ public class TimeBuckets {
     if (_timeBuckets.length == 0) {
       return -1;
     }
-    if (timeValue < _timeBuckets[0]) {
+    if (timeValue <= getTimeRangeStartExclusive() || timeValue > 
getTimeRangeEndInclusive()) {
       return -1;
     }
-    if (timeValue >= _timeBuckets[_timeBuckets.length - 1] + 
_bucketSize.getSeconds()) {
-      return -1;
-    }
-    return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds());
+    long offsetFromRangeStart = timeValue - getTimeRangeStartExclusive();
+    // Subtract 1 from the offset because we have intervals half-open on the 
left.
+    return (int) ((offsetFromRangeStart - 1) / _bucketSize.getSeconds());
   }
 
   @Override
@@ -82,7 +81,8 @@ public class TimeBuckets {
       return false;
     }
     TimeBuckets other = (TimeBuckets) o;
-    return this.getStartTime() == other.getStartTime() && this.getEndTime() == 
other.getEndTime()
+    return this.getTimeRangeStartExclusive() == 
other.getTimeRangeStartExclusive()
+        && this.getTimeRangeEndInclusive() == other.getTimeRangeEndInclusive()
         && this.getBucketSize().equals(other.getBucketSize());
   }
 
@@ -93,11 +93,22 @@ public class TimeBuckets {
     return result;
   }
 
-  public static TimeBuckets ofSeconds(long startTimeSeconds, Duration 
bucketSize, int numElements) {
+  /**
+   * Creates time-buckets, with the first value in the bucket being 
firstBucketValue (FBV). The time range represented
+   * by the buckets are:
+   * <pre>
+   *   (FBV - bucketSize.getSeconds(), FBV + (numElements - 1) * 
bucketSize.getSeconds()]
+   * </pre>
+   * The raw Long[] time values are:
+   * <pre>
+   *   FBV, FBV + bucketSize.getSeconds(), ... , FBV + (numElements - 1) * 
bucketSize.getSeconds()
+   * </pre>
+   */
+  public static TimeBuckets ofSeconds(long firstBucketValue, Duration 
bucketSize, int numElements) {
     long stepSize = bucketSize.getSeconds();
     Long[] timeBuckets = new Long[numElements];
     for (int i = 0; i < numElements; i++) {
-      timeBuckets[i] = startTimeSeconds + i * stepSize;
+      timeBuckets[i] = firstBucketValue + i * stepSize;
     }
     return new TimeBuckets(timeBuckets, bucketSize);
   }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 856b81622a..773d675242 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -115,14 +115,12 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
 
   public String getEffectiveFilter(TimeBuckets timeBuckets) {
     String filter = _filterExpression == null ? "" : _filterExpression;
-    long startTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - 
_offsetSeconds));
-    long endTime =
-        _timeUnit.convert(Duration.ofSeconds(
-            timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds() 
- _offsetSeconds));
-    String addnFilter = String.format("%s >= %d AND %s < %d", _timeColumn, 
startTime, _timeColumn, endTime);
+    long startTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() - 
_offsetSeconds));
+    long endTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeEndInclusive() - 
_offsetSeconds));
+    String timeFilter = String.format("%s > %d AND %s <= %d", _timeColumn, 
startTime, _timeColumn, endTime);
     if (filter.strip().isEmpty()) {
-      return addnFilter;
+      return timeFilter;
     }
-    return String.format("(%s) AND (%s)", filter, addnFilter);
+    return String.format("(%s) AND (%s)", filter, timeFilter);
   }
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java
new file mode 100644
index 0000000000..27d4ce7543
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import java.time.Duration;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeBucketsTest {
+  @Test
+  public void testTimeBucketsSemantics() {
+    /*
+     * time-bucket values: [10_000, 10_100, 10_200, ... , 10_900]
+     */
+    final int firstBucketValue = 10_000;
+    final int bucketSize = 100;
+    final int numElements = 10;
+    TimeBuckets timeBuckets = TimeBuckets.ofSeconds(firstBucketValue, 
Duration.ofSeconds(bucketSize), numElements);
+    assertEquals(timeBuckets.getNumBuckets(), numElements);
+    assertEquals(timeBuckets.getBucketSize().getSeconds(), bucketSize);
+    assertEquals(timeBuckets.getTimeRangeStartExclusive(), firstBucketValue - 
bucketSize);
+    assertEquals(timeBuckets.getTimeRangeEndInclusive(), firstBucketValue + 
(numElements - 1) * bucketSize);
+    assertEquals(timeBuckets.getRangeSeconds(),
+        timeBuckets.getTimeRangeEndInclusive() - 
timeBuckets.getTimeRangeStartExclusive());
+    assertEquals(timeBuckets.resolveIndex(10_000), 0);
+    assertEquals(timeBuckets.resolveIndex(9_999), 0);
+    assertEquals(timeBuckets.resolveIndex(9_901), 0);
+    assertEquals(timeBuckets.resolveIndex(10_100), 1);
+    assertEquals(timeBuckets.resolveIndex(10_101), 2);
+    assertEquals(timeBuckets.resolveIndex(10_900), 9);
+    // Test out of bound indexes
+    assertEquals(timeBuckets.resolveIndex(9_900), -1);
+    assertEquals(timeBuckets.resolveIndex(10_901), -1);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
index f439bfc028..011cb6fbc6 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -37,8 +37,8 @@ public class LeafTimeSeriesPlanNodeTest {
   @Test
   public void testGetEffectiveFilter() {
     TimeBuckets timeBuckets = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(13), 9);
-    final long expectedStartTimeInFilter = 1000;
-    final long expectedEndTimeInFilter = 1000 + 13 * 9;
+    final long expectedStartTimeInFilter = 
timeBuckets.getTimeRangeStartExclusive();
+    final long expectedEndTimeInFilter = 
timeBuckets.getTimeRangeEndInclusive();
     final String nonEmptyFilter = "cityName = 'Chicago'";
     // Case-1: No offset, and empty filter.
     {
@@ -46,7 +46,7 @@ public class LeafTimeSeriesPlanNodeTest {
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
               new AggInfo("SUM", null), Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
-          "orderTime >= " + expectedStartTimeInFilter + " AND orderTime < " + 
expectedEndTimeInFilter);
+          "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + 
expectedEndTimeInFilter);
     }
     // Case-2: Offset, but empty filter
     {
@@ -54,7 +54,7 @@ public class LeafTimeSeriesPlanNodeTest {
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
               new AggInfo("SUM", null), Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
-          "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND 
orderTime < " + (expectedEndTimeInFilter - 123));
+          "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime 
<= " + (expectedEndTimeInFilter - 123));
     }
     // Case-3: Offset and non-empty filter
     {
@@ -62,7 +62,7 @@ public class LeafTimeSeriesPlanNodeTest {
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
               "value_col", new AggInfo("SUM", null), 
Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
-          String.format("(%s) AND (orderTime >= %s AND orderTime < %s)", 
nonEmptyFilter,
+          String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", 
nonEmptyFilter,
               (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 
123)));
     }
     // Case-4: Offset, and non-empty filter, and time-unit that is not seconds
@@ -71,7 +71,7 @@ public class LeafTimeSeriesPlanNodeTest {
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
               nonEmptyFilter, "value_col", new AggInfo("SUM", null), 
Collections.singletonList("cityName"));
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
-          String.format("(%s) AND (orderTime >= %s AND orderTime < %s)", 
nonEmptyFilter,
+          String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", 
nonEmptyFilter,
               (expectedStartTimeInFilter * 1000 - 123 * 1000), 
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to