This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b828280882 Part-1: Pinot Timeseries Engine SPI (#13885)
b828280882 is described below

commit b8282808826b143cabcbf59fa408493fe9d03665
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Tue Sep 10 02:11:09 2024 +0530

    Part-1: Pinot Timeseries Engine SPI (#13885)
---
 .gitignore                                         |   3 +-
 pinot-timeseries/pinot-timeseries-spi/pom.xml      |  49 ++++++++
 .../java/org/apache/pinot/tsdb/spi/AggInfo.java    |  44 +++++++
 .../tsdb/spi/PinotTimeSeriesConfiguration.java     |  57 +++++++++
 .../pinot/tsdb/spi/RangeTimeSeriesRequest.java     | 103 +++++++++++++++++
 .../org/apache/pinot/tsdb/spi/TimeBuckets.java     | 104 +++++++++++++++++
 .../tsdb/spi/TimeSeriesLogicalPlanResult.java      |  43 +++++++
 .../pinot/tsdb/spi/TimeSeriesLogicalPlanner.java   |  39 +++++++
 .../tsdb/spi/operator/BaseTimeSeriesOperator.java  |  65 +++++++++++
 .../tsdb/spi/plan/BaseTimeSeriesPlanNode.java      |  55 +++++++++
 .../spi/plan/ScanFilterAndProjectPlanNode.java     | 127 +++++++++++++++++++++
 .../tsdb/spi/plan/serde/TimeSeriesPlanSerde.java   |  95 +++++++++++++++
 .../tsdb/spi/series/BaseTimeSeriesBuilder.java     |  75 ++++++++++++
 .../apache/pinot/tsdb/spi/series/TimeSeries.java   | 113 ++++++++++++++++++
 .../pinot/tsdb/spi/series/TimeSeriesBlock.java     |  48 ++++++++
 .../tsdb/spi/series/TimeSeriesBuilderFactory.java  |  36 ++++++
 .../series/TimeSeriesBuilderFactoryProvider.java   |  63 ++++++++++
 .../spi/series/builders/MaxTimeSeriesBuilder.java  |  57 +++++++++
 .../spi/series/builders/MinTimeSeriesBuilder.java  |  57 +++++++++
 .../series/builders/SummingTimeSeriesBuilder.java  |  55 +++++++++
 .../spi/plan/serde/TimeSeriesPlanSerdeTest.java    |  52 +++++++++
 pinot-timeseries/pom.xml                           |  44 +++++++
 pom.xml                                            |   1 +
 23 files changed, 1384 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index ca69e90d0e..00404492f8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,7 +45,8 @@ yarn-error.log*
 quickstart*
 
 #build symlink directory
-build*
+build
+build/*
 
 #helm related files
 kubernetes/helm/**/charts/
diff --git a/pinot-timeseries/pinot-timeseries-spi/pom.xml 
b/pinot-timeseries/pinot-timeseries-spi/pom.xml
new file mode 100644
index 0000000000..c21e9971d5
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-spi/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pinot</groupId>
+    <artifactId>pinot</artifactId>
+    <version>1.3.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>pinot-timeseries-spi</artifactId>
+
+  <properties>
+    <pinot.root>${basedir}/../..</pinot.root>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-spi</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
new file mode 100644
index 0000000000..03d9cc8aa9
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * AggInfo is used to represent the aggregation function. Aggregation 
functions are simply stored as a string,
+ * since time-series languages are allowed to implement their own aggregation 
functions.
+ * TODO: We will likely be adding more parameters to this. One candidate is 
partial/full aggregation information or
+ *   aggregation result type to allow for intermediate result types.
+ */
+public class AggInfo {
+  private final String _aggFunction;
+
+  @JsonCreator
+  public AggInfo(@JsonProperty("aggFunction") String aggFunction) {
+    Preconditions.checkNotNull(aggFunction, "Received null aggFunction in 
AggInfo");
+    _aggFunction = aggFunction;
+  }
+
+  public String getAggFunction() {
+    return _aggFunction;
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
new file mode 100644
index 0000000000..d0fac4369f
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+public class PinotTimeSeriesConfiguration {
+  private PinotTimeSeriesConfiguration() {
+  }
+
+  public static final String CONFIG_PREFIX = "pinot.timeseries";
+  private static final String ENABLE_LANGUAGES_SUFFIX = ".languages";
+  private static final String SERIES_BUILDER_FACTORY_SUFFIX = 
".series.builder.factory";
+  private static final String LOGICAL_PLANNER_CLASS_SUFFIX = 
".logical.planner.class";
+
+  /**
+   * Config key that controls which time-series languages are enabled in a 
given Pinot cluster.
+   */
+  public static String getEnabledLanguagesConfigKey() {
+    return CONFIG_PREFIX + ENABLE_LANGUAGES_SUFFIX;
+  }
+
+  /**
+   * Returns the config key which determines the class name for the {@link 
TimeSeriesBuilderFactory} to be used for a
+   * given language. Each language can have its own {@link 
TimeSeriesBuilderFactory}, which allows each language to
+   * support custom time-series functions.
+   */
+  public static String getSeriesBuilderFactoryConfigKey(String language) {
+    return CONFIG_PREFIX + "." + language + SERIES_BUILDER_FACTORY_SUFFIX;
+  }
+
+  /**
+   * Returns config key which determines the class name for the {@link 
TimeSeriesLogicalPlanner} to be used for a given
+   * language. Pinot broker will load this logical planner on start-up 
dynamically. This is called for each language
+   * configured via {@link #getEnabledLanguagesConfigKey()}.
+   */
+  public static String getLogicalPlannerConfigKey(String language) {
+    return CONFIG_PREFIX + "." + language + LOGICAL_PLANNER_CLASS_SUFFIX;
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
new file mode 100644
index 0000000000..2c4fc045a4
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+
+
+/**
+ * A time-series request received by the Pinot Broker. This is passed to the 
{@link TimeSeriesLogicalPlanner} so
+ * each query language can parse and plan the query based on their spec.
+ * <br/>
+ * <br/>
+ * <b>Notes:</b>
+ * <ul>
+ *   <li>[start, end] are both inclusive.</li>
+ *   <li>
+ *     The result can contain time values outside [start, end], though we 
generally recommend to keep your results
+ *     within the requested range. This decision is left to the time-series 
query language implementations. In some
+ *     cases, returning data outside the requested time-range can help (e.g. 
for debugging purposes when you are
+ *     computing moving 1d sum but are only looking at data for the last 12 
hours).
+ *   </li>
+ *   <li>stepSeconds is used to define the default resolution for the 
query</li>
+ *   <li>
+ *     Some query languages allow users to change the resolution via a 
function, and in those cases the returned
+ *     time-series may have a resolution different than stepSeconds
+ *   </li>
+ *   <li>
+ *    The query execution may scan and process data outside of the time-range 
[start, end]. The actual data scanned
+ *    and processed is defined by the {@link TimeBuckets} used by the operator.
+ *   </li>
+ * </ul>
+ */
+public class RangeTimeSeriesRequest {
+  /** Engine allows a Pinot cluster to support multiple Time-Series Query 
Languages. */
+  private final String _engine;
+  /** Query is the raw query sent by the caller. */
+  private final String _query;
+  /** Start time of the time-window being queried. */
+  private final long _startSeconds;
+  /** End time of the time-window being queried. */
+  private final long _endSeconds;
+  /**
+   * <b>Optional</b> field which the caller can use to suggest the default 
resolution for the query. Language
+   * implementations can choose to skip this suggestion and choose their own 
resolution based on their semantics.
+   */
+  private final long _stepSeconds;
+  /** E2E timeout for the query. */
+  private final Duration _timeout;
+
+  public RangeTimeSeriesRequest(String engine, String query, long 
startSeconds, long endSeconds, long stepSeconds,
+      Duration timeout) {
+    Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. 
startSeconds "
+        + "should be greater than or equal to endSeconds. Found 
startSeconds=%s and endSeconds=%s",
+        startSeconds, endSeconds);
+    _engine = engine;
+    _query = query;
+    _startSeconds = startSeconds;
+    _endSeconds = endSeconds;
+    _stepSeconds = stepSeconds;
+    _timeout = timeout;
+  }
+
+  public String getEngine() {
+    return _engine;
+  }
+
+  public String getQuery() {
+    return _query;
+  }
+
+  public long getStartSeconds() {
+    return _startSeconds;
+  }
+
+  public long getEndSeconds() {
+    return _endSeconds;
+  }
+
+  public long getStepSeconds() {
+    return _stepSeconds;
+  }
+
+  public Duration getTimeout() {
+    return _timeout;
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
new file mode 100644
index 0000000000..866249845e
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * Time buckets used for query execution. Each element (say x) in the {@link 
#getTimeBuckets()} array represents a
+ * time-range which is half open on the right side: [x, x + 
bucketSize.getSeconds()). Some query languages allow some
+ * operators to mutate the time-buckets on the fly, so it is not guaranteed 
that the time resolution and/or range
+ * will be the same across all operators. For instance, Uber's M3QL supports a 
"summarize 1h sum" operator which will
+ * change the bucket resolution to 1 hour for all subsequent operators.
+ */
+public class TimeBuckets {
+  private final Long[] _timeBuckets;
+  private final Duration _bucketSize;
+
+  private TimeBuckets(Long[] timeBuckets, Duration bucketSize) {
+    _timeBuckets = timeBuckets;
+    _bucketSize = bucketSize;
+  }
+
+  public Long[] getTimeBuckets() {
+    return _timeBuckets;
+  }
+
+  public Duration getBucketSize() {
+    return _bucketSize;
+  }
+
+  public long getStartTime() {
+    return _timeBuckets[0];
+  }
+
+  public long getEndTime() {
+    return _timeBuckets[_timeBuckets.length - 1];
+  }
+
+  public long getRangeSeconds() {
+    return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0];
+  }
+
+  public int getNumBuckets() {
+    return _timeBuckets.length;
+  }
+
+  public int resolveIndex(long timeValue) {
+    if (_timeBuckets.length == 0) {
+      return -1;
+    }
+    if (timeValue < _timeBuckets[0]) {
+      return -1;
+    }
+    if (timeValue >= _timeBuckets[_timeBuckets.length - 1] + 
_bucketSize.getSeconds()) {
+      return -1;
+    }
+    return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof TimeBuckets)) {
+      return false;
+    }
+    TimeBuckets other = (TimeBuckets) o;
+    return this.getStartTime() == other.getStartTime() && this.getEndTime() == 
other.getEndTime()
+        && this.getBucketSize().equals(other.getBucketSize());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(_bucketSize);
+    result = 31 * result + Arrays.hashCode(_timeBuckets);
+    return result;
+  }
+
+  public static TimeBuckets ofSeconds(long startTimeSeconds, Duration 
bucketSize, int numElements) {
+    long stepSize = bucketSize.getSeconds();
+    Long[] timeBuckets = new Long[numElements];
+    for (int i = 0; i < numElements; i++) {
+      timeBuckets[i] = startTimeSeconds + i * stepSize;
+    }
+    return new TimeBuckets(timeBuckets, bucketSize);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java
new file mode 100644
index 0000000000..829a4042bb
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+
+
+/**
+ * The result of logical planning.
+ */
+public class TimeSeriesLogicalPlanResult {
+  private final BaseTimeSeriesPlanNode _planNode;
+  private final TimeBuckets _timeBuckets;
+
+  public TimeSeriesLogicalPlanResult(BaseTimeSeriesPlanNode planNode, 
TimeBuckets timeBuckets) {
+    _planNode = planNode;
+    _timeBuckets = timeBuckets;
+  }
+
+  public BaseTimeSeriesPlanNode getPlanNode() {
+    return _planNode;
+  }
+
+  public TimeBuckets getTimeBuckets() {
+    return _timeBuckets;
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
new file mode 100644
index 0000000000..0c7e724ca8
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+/**
+ * Allows time-series query languages to implement their own logical planner. 
The input to this planner is a
+ * {@link RangeTimeSeriesRequest} and the output is a {@link 
TimeSeriesLogicalPlanResult}. Put simply, this abstraction
+ * takes in the query text and other parameters, and returns a logical plan 
which is a tree of
+ * {@link BaseTimeSeriesPlanNode}. Other than the plan-tree, the planner also 
returns a {@link TimeBuckets} which is
+ * the default TimeBuckets used by the query operators at runtime. 
Implementations are free to adjust them as they see
+ * fit. For instance, one query language might want to extend to the left or 
right of the time-range based on certain
+ * operators. Also, see {@link 
ScanFilterAndProjectPlanNode#getEffectiveFilter(TimeBuckets)}.
+ */
+public interface TimeSeriesLogicalPlanner {
+  void init(Map<String, Object> config);
+
+  TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request);
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java
new file mode 100644
index 0000000000..28be511c37
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.operator;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * Every time-series operator takes in a {@link TimeSeriesBlock} and returns 
another {@link TimeSeriesBlock}.
+ * Parent operators/callers must call {@link #nextBlock()} to get the next 
block from the child operators, and implement
+ * {@link #getNextBlock()} to implement the business logic for their operator. 
Also see {@link BaseTimeSeriesPlanNode}.
+ * TODO: Add common hierarchy with other operators like Multistage and Pinot 
core. This will likely require us to
+ *   define a pinot-query-spi or add/move some abstractions to pinot-spi.
+ */
+public abstract class BaseTimeSeriesOperator {
+  protected final List<BaseTimeSeriesOperator> _childOperators;
+
+  public BaseTimeSeriesOperator(List<BaseTimeSeriesOperator> childOperators) {
+    _childOperators = childOperators;
+  }
+
+  /**
+   * Called by parent time-series operators.
+   */
+  public final TimeSeriesBlock nextBlock() {
+    long startTime = System.currentTimeMillis();
+    try {
+      return getNextBlock();
+    } finally {
+      // TODO: add stats
+    }
+  }
+
+  public List<BaseTimeSeriesOperator> getChildOperators() {
+    return _childOperators;
+  }
+
+  /**
+   * Time series query languages can implement their own business logic in 
their operators.
+   */
+  public abstract TimeSeriesBlock getNextBlock();
+
+  /**
+   * Name that will show up in the explain plan.
+   */
+  public abstract String getExplainName();
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java
new file mode 100644
index 0000000000..dd7a951752
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+
+
+/**
+ * Generic plan node for time series queries. This allows each time-series 
query language to define their own plan
+ * nodes, which in turn generate the language specific {@link 
BaseTimeSeriesOperator}.
+ */
+public abstract class BaseTimeSeriesPlanNode {
+  protected final String _id;
+  protected final List<BaseTimeSeriesPlanNode> _children;
+
+  public BaseTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode> 
children) {
+    _id = id;
+    _children = children;
+  }
+
+  public String getId() {
+    return _id;
+  }
+
+  public List<BaseTimeSeriesPlanNode> getChildren() {
+    return _children;
+  }
+
+  public void addChildNode(BaseTimeSeriesPlanNode planNode) {
+    _children.add(planNode);
+  }
+
+  public abstract String getKlass();
+
+  public abstract String getExplainName();
+
+  public abstract BaseTimeSeriesOperator run();
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
new file mode 100644
index 0000000000..e2a0a15f27
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+
+
+/**
+ * This would typically be the leaf node of a plan-tree generated by a 
time-series engine's logical planner. At runtime,
+ * this gets compiled to a Combine Operator.
+ * <b>Note:</b> You don't need to pass the time-filter to the filter 
expression, since Pinot will automatically compute
+ *   the time filter based on the computed time buckets in {@link 
TimeSeriesLogicalPlanner}.
+ */
+public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode {
+  private static final String EXPLAIN_NAME = "SCAN_FILTER_AND_PROJECT";
+  private final String _tableName;
+  private final String _timeColumn;
+  private final TimeUnit _timeUnit;
+  private final Long _offset;
+  private final String _filterExpression;
+  private final String _valueExpression;
+  private final AggInfo _aggInfo;
+  private final List<String> _groupByColumns;
+
+  @JsonCreator
+  public ScanFilterAndProjectPlanNode(
+      @JsonProperty("id") String id, @JsonProperty("children") 
List<BaseTimeSeriesPlanNode> children,
+      @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") 
String timeColumn,
+      @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") 
Long offset,
+      @JsonProperty("filterExpression") String filterExpression,
+      @JsonProperty("valueExpression") String valueExpression,
+      @JsonProperty("aggInfo") AggInfo aggInfo, 
@JsonProperty("groupByColumns") List<String> groupByColumns) {
+    super(id, children);
+    _tableName = tableName;
+    _timeColumn = timeColumn;
+    _timeUnit = timeUnit;
+    // TODO: This is broken technically. Adjust offset to meet TimeUnit 
resolution. For now use 0 offset.
+    _offset = offset;
+    _filterExpression = filterExpression;
+    _valueExpression = valueExpression;
+    _aggInfo = aggInfo;
+    _groupByColumns = groupByColumns;
+  }
+
+  @Override
+  public String getKlass() {
+    return ScanFilterAndProjectPlanNode.class.getName();
+  }
+
+  @Override
+  public String getExplainName() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  public BaseTimeSeriesOperator run() {
+    throw new UnsupportedOperationException("");
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public String getTimeColumn() {
+    return _timeColumn;
+  }
+
+  public TimeUnit getTimeUnit() {
+    return _timeUnit;
+  }
+
+  public Long getOffset() {
+    return _offset;
+  }
+
+  public String getFilterExpression() {
+    return _filterExpression;
+  }
+
+  public String getValueExpression() {
+    return _valueExpression;
+  }
+
+  public AggInfo getAggInfo() {
+    return _aggInfo;
+  }
+
+  public List<String> getGroupByColumns() {
+    return _groupByColumns;
+  }
+
+  public String getEffectiveFilter(TimeBuckets timeBuckets) {
+    String filter = _filterExpression == null ? "" : _filterExpression;
+    // TODO: This is wrong. offset should be converted to seconds before 
arithmetic. For now use 0 offset.
+    long startTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset));
+    long endTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset));
+    String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn, 
startTime, _timeColumn, endTime);
+    if (filter.strip().isEmpty()) {
+      return addnFilter;
+    }
+    return String.format("(%s) AND (%s)", filter, addnFilter);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
new file mode 100644
index 0000000000..e4e036e1ff
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan.serde;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+/**
+ * We have implemented a custom serialization/deserialization mechanism for 
time series plans. This allows users to
+ * use Jackson to annotate their plan nodes as shown in {@link 
ScanFilterAndProjectPlanNode}, which is used for
+ * plan serde for broker/server communication.
+ * TODO: There are limitations to this and we will change this soon. Issues:
+ *   1. Pinot TS SPI is compiled in Pinot distribution and Jackson deps get 
shaded usually.
+ *   2. The plugins have to shade the dependency in the exact same way, which 
is obviously error-prone and not ideal.
+ */
+@InterfaceStability.Evolving
+public class TimeSeriesPlanSerde {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  static {
+    OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+  }
+
+  private TimeSeriesPlanSerde() {
+  }
+
+  public static String serialize(BaseTimeSeriesPlanNode planNode) {
+    try {
+      return OBJECT_MAPPER.writeValueAsString(planNode);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while serializing plan", e);
+    }
+  }
+
+  public static BaseTimeSeriesPlanNode deserialize(String planString) {
+    try {
+      JsonNode jsonNode = OBJECT_MAPPER.readTree(planString);
+      return create(jsonNode);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while deserializing plan", 
e);
+    }
+  }
+
+  public static BaseTimeSeriesPlanNode create(JsonNode jsonNode)
+      throws JsonProcessingException, ClassNotFoundException {
+    JsonNode children = null;
+    if (jsonNode instanceof ObjectNode) {
+      // Remove children field to prevent Jackson from deserializing it. We 
will handle it manually.
+      ObjectNode objectNode = (ObjectNode) jsonNode;
+      if (objectNode.has("children")) {
+        children = objectNode.get("children");
+        objectNode.remove("children");
+      }
+      objectNode.putIfAbsent("children", OBJECT_MAPPER.createArrayNode());
+    }
+    BaseTimeSeriesPlanNode planNode = null;
+    try {
+      String klassName = jsonNode.get("klass").asText();
+      Class<BaseTimeSeriesPlanNode> klass = (Class<BaseTimeSeriesPlanNode>) 
Class.forName(klassName);
+      planNode = OBJECT_MAPPER.readValue(jsonNode.toString(), klass);
+    } finally {
+      if (planNode != null && children instanceof ArrayNode) {
+        ArrayNode childArray = (ArrayNode) children;
+        for (JsonNode childJsonNode : childArray) {
+          planNode.addChildNode(create(childJsonNode));
+        }
+      }
+    }
+    return planNode;
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
new file mode 100644
index 0000000000..3509e7cfcd
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+/**
+ * BaseSeriesBuilder allows language implementations to build their own 
aggregation and other time-series functions.
+ * Each time-series operator would typically call either of {@link #addValue} 
or {@link #addValueAtIndex}. When
+ * the operator is done, it will call {@link #build()} to allow the builder to 
compute the final {@link TimeSeries}.
+ */
+public abstract class BaseTimeSeriesBuilder {
+  protected final String _id;
+  @Nullable
+  protected final Long[] _timeValues;
+  @Nullable
+  protected final TimeBuckets _timeBuckets;
+  protected final List<String> _tagNames;
+  protected final Object[] _tagValues;
+
+  public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, 
@Nullable TimeBuckets timeBuckets,
+      List<String> tagNames, Object[] tagValues) {
+    _id = id;
+    _timeValues = timeValues;
+    _timeBuckets = timeBuckets;
+    _tagNames = tagNames;
+    _tagValues = tagValues;
+  }
+
+  public abstract void addValueAtIndex(int timeBucketIndex, Double value);
+
+  public void addValueAtIndex(int timeBucketIndex, String value) {
+    throw new IllegalStateException("This aggregation function does not 
support string input");
+  }
+
+  public abstract void addValue(long timeValue, Double value);
+
+  public void mergeSeries(TimeSeries series) {
+    int numDataPoints = series.getValues().length;
+    Long[] timeValues = Objects.requireNonNull(series.getTimeValues(),
+        "Cannot merge series: found null timeValues");
+    for (int i = 0; i < numDataPoints; i++) {
+      addValue(timeValues[i], series.getValues()[i]);
+    }
+  }
+
+  public void mergeAlignedSeries(TimeSeries series) {
+    int numDataPoints = series.getValues().length;
+    for (int i = 0; i < numDataPoints; i++) {
+      addValueAtIndex(i, series.getValues()[i]);
+    }
+  }
+
+  public abstract TimeSeries build();
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
new file mode 100644
index 0000000000..5fcf70b42e
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+/**
+ * Logically, a time-series is a list of pairs of time and data values, where 
time is stored in increasing order.
+ * A time-series is identified using its ID, which can be retrieved using 
{@link #getId()}.
+ * A time series typically also has a set of pairs of keys and values which 
are called tags or labels.
+ * We allow a Series to store time either via {@link TimeBuckets} or via a 
long array as in {@link #getTimeValues()}.
+ * Using {@link TimeBuckets} is ideal when your queries are working on evenly 
spaced time ranges. The other option
+ * exists to support use-cases such as "Instant Vectors" in PromQL.
+ * <p>
+ *   <b>Warning:</b> The time and value arrays passed to the Series are not 
copied, and can be modified by anyone with
+ *   access to them. This is by design, to make it easier to re-use buffers 
during time-series operations.
+ * </p>
+ */
+public class TimeSeries {
+  private final String _id;
+  private final Long[] _timeValues;
+  private final TimeBuckets _timeBuckets;
+  private final Double[] _values;
+  private final List<String> _tagNames;
+  private final Object[] _tagValues;
+
+  public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable 
TimeBuckets timeBuckets, Double[] values,
+      List<String> tagNames, Object[] tagValues) {
+    _id = id;
+    _timeValues = timeValues;
+    _timeBuckets = timeBuckets;
+    _values = values;
+    _tagNames = Collections.unmodifiableList(tagNames);
+    _tagValues = tagValues;
+  }
+
+  public String getId() {
+    return _id;
+  }
+
+  @Nullable
+  public Long[] getTimeValues() {
+    return _timeValues;
+  }
+
+  @Nullable
+  public TimeBuckets getTimeBuckets() {
+    return _timeBuckets;
+  }
+
+  public Double[] getValues() {
+    return _values;
+  }
+
+  public List<String> getTagNames() {
+    return _tagNames;
+  }
+
+  public Object[] getTagValues() {
+    return _tagValues;
+  }
+
+  public Map<String, String> getTagKeyValuesAsMap() {
+    Map<String, String> result = new HashMap<>();
+    for (int index = 0; index < _tagNames.size(); index++) {
+      String tagValue = _tagValues[index] == null ? "null" : 
_tagValues[index].toString();
+      result.put(_tagNames.get(index), tagValue);
+    }
+    return result;
+  }
+
+  public String getTagsSerialized() {
+    if (_tagNames.isEmpty()) {
+      return "*";
+    }
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < _tagNames.size(); i++) {
+      if (i > 0) {
+        sb.append(",");
+      }
+      sb.append(String.format("%s=%s", _tagNames.get(i), _tagValues[i]));
+    }
+    return sb.toString();
+  }
+
+  // TODO: This can be cleaned up
+  public static long hash(Object[] tagNamesAndValues) {
+    return Objects.hash(tagNamesAndValues);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
new file mode 100644
index 0000000000..fe7fd5be42
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+/**
+ * Block used by time series operators. We store the series data in a map 
keyed by the series' ID. The value is a
+ * list of series, because some query languages support "union" operations 
which allow series with the same tags/labels
+ * to exist either in the query response or temporarily during execution 
before some n-ary series function
+ * is applied.
+ */
+public class TimeSeriesBlock {
+  private final TimeBuckets _timeBuckets;
+  private final Map<Long, List<TimeSeries>> _seriesMap;
+
+  public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> 
seriesMap) {
+    _timeBuckets = timeBuckets;
+    _seriesMap = seriesMap;
+  }
+
+  public TimeBuckets getTimeBuckets() {
+    return _timeBuckets;
+  }
+
+  public Map<Long, List<TimeSeries>> getSeriesMap() {
+    return _seriesMap;
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
new file mode 100644
index 0000000000..088f9b3c85
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.List;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+public abstract class TimeSeriesBuilderFactory {
+  public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder(
+      AggInfo aggInfo,
+      String id,
+      TimeBuckets timeBuckets,
+      List<String> tagNames,
+      Object[] tagValues);
+
+  public abstract void init(PinotConfiguration pinotConfiguration);
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
new file mode 100644
index 0000000000..b757579839
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+
+
+/**
+ * Loads all series builder providers for all configured time-series query 
languages.
+ */
+public class TimeSeriesBuilderFactoryProvider {
+  private static final Map<String, TimeSeriesBuilderFactory> FACTORY_MAP = new 
HashMap<>();
+
+  private TimeSeriesBuilderFactoryProvider() {
+  }
+
+  public static void init(PinotConfiguration pinotConfiguration) {
+    String[] languages = 
pinotConfiguration.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(),
 "")
+        .split(",");
+    for (String language : languages) {
+      String seriesBuilderClass = pinotConfiguration
+          
.getProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey(language));
+      try {
+        Object untypedSeriesBuilderFactory = 
Class.forName(seriesBuilderClass).getConstructor().newInstance();
+        if (!(untypedSeriesBuilderFactory instanceof 
TimeSeriesBuilderFactory)) {
+          throw new RuntimeException("Series builder factory class " + 
seriesBuilderClass
+              + " does not implement SeriesBuilderFactory");
+        }
+        TimeSeriesBuilderFactory seriesBuilderFactory = 
(TimeSeriesBuilderFactory) untypedSeriesBuilderFactory;
+        seriesBuilderFactory.init(pinotConfiguration.subset(
+            PinotTimeSeriesConfiguration.CONFIG_PREFIX + "." + language));
+        FACTORY_MAP.put(language, seriesBuilderFactory);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static TimeSeriesBuilderFactory getSeriesBuilderFactory(String 
engine) {
+    return Objects.requireNonNull(FACTORY_MAP.get(engine),
+        "No series builder factory found for engine: " + engine);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java
new file mode 100644
index 0000000000..742b1b32c6
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series.builders;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+
+
+/**
+ * MaxSeriesBuilder is a series builder that computes the maximum value in 
each time bucket.
+ * <b>Context:</b>We provide some ready to use implementations for some of the 
most common use-cases in the SPI. This
+ * reduces redundancy and also serves as a reference implementation for 
language developers.
+ */
+public class MaxTimeSeriesBuilder extends BaseTimeSeriesBuilder {
+  private final Double[] _values;
+
+  public MaxTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String> 
tagNames, Object[] tagValues) {
+    super(id, null, timeBuckets, tagNames, tagValues);
+    _values = new Double[timeBuckets.getNumBuckets()];
+  }
+
+  @Override
+  public void addValueAtIndex(int timeBucketIndex, Double value) {
+    if (_values[timeBucketIndex] == null || value > _values[timeBucketIndex]) {
+      _values[timeBucketIndex] = value;
+    }
+  }
+
+  @Override
+  public void addValue(long timeValue, Double value) {
+    int timeBucketIndex = _timeBuckets.resolveIndex(timeValue);
+    addValueAtIndex(timeBucketIndex, value);
+  }
+
+  @Override
+  public TimeSeries build() {
+    return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, 
_tagValues);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java
new file mode 100644
index 0000000000..93cdab77d4
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series.builders;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+
+
+/**
+ * MinSeriesBuilder is a series builder that computes the minimum value in 
each time bucket.
+ * <b>Context:</b>We provide some ready to use implementations for some of the 
most common use-cases in the SPI. This
+ * reduces redundancy and also serves as a reference implementation for 
language developers.
+ */
+public class MinTimeSeriesBuilder extends BaseTimeSeriesBuilder {
+  private final Double[] _values;
+
+  public MinTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String> 
tagNames, Object[] tagValues) {
+    super(id, null, timeBuckets, tagNames, tagValues);
+    _values = new Double[timeBuckets.getNumBuckets()];
+  }
+
+  @Override
+  public void addValueAtIndex(int timeBucketIndex, Double value) {
+    if (_values[timeBucketIndex] == null || value < _values[timeBucketIndex]) {
+      _values[timeBucketIndex] = value;
+    }
+  }
+
+  @Override
+  public void addValue(long timeValue, Double value) {
+    int timeBucketIndex = _timeBuckets.resolveIndex(timeValue);
+    addValueAtIndex(timeBucketIndex, value);
+  }
+
+  @Override
+  public TimeSeries build() {
+    return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, 
_tagValues);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java
new file mode 100644
index 0000000000..2cf723b8e4
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series.builders;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+
+
+/**
+ * SummingSeriesBuilder is a series builder that computes the sum of all 
values in each time bucket.
+ * <b>Context:</b>We provide some ready to use implementations for some of the 
most common use-cases in the SPI. This
+ * reduces redundancy and also serves as a reference implementation for 
language developers.
+ */
+public class SummingTimeSeriesBuilder extends BaseTimeSeriesBuilder {
+  private final Double[] _values;
+
+  public SummingTimeSeriesBuilder(String id, TimeBuckets timeBuckets, 
List<String> tagNames, Object[] tagValues) {
+    super(id, null, timeBuckets, tagNames, tagValues);
+    _values = new Double[timeBuckets.getNumBuckets()];
+  }
+
+  @Override
+  public void addValueAtIndex(int timeBucketIndex, Double value) {
+    _values[timeBucketIndex] = (_values[timeBucketIndex] == null ? 0 : 
_values[timeBucketIndex]) + value;
+  }
+
+  @Override
+  public void addValue(long timeValue, Double value) {
+    int timeBucketIndex = _timeBuckets.resolveIndex(timeValue);
+    addValueAtIndex(timeBucketIndex, value);
+  }
+
+  @Override
+  public TimeSeries build() {
+    return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, 
_tagValues);
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
new file mode 100644
index 0000000000..ff74b6ef35
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan.serde;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesPlanSerdeTest {
+  @Test
+  public void testSerdeForScanFilterProjectNode() {
+    ScanFilterAndProjectPlanNode scanFilterAndProjectPlanNode = new 
ScanFilterAndProjectPlanNode(
+        "sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", 
TimeUnit.MILLISECONDS,
+        0L, "myFilterExpression", "myValueExpression",
+        new AggInfo("SUM"), new ArrayList<>()
+    );
+    BaseTimeSeriesPlanNode planNode =
+        
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(scanFilterAndProjectPlanNode));
+    assertTrue(planNode instanceof ScanFilterAndProjectPlanNode);
+    ScanFilterAndProjectPlanNode deserializedNode = 
(ScanFilterAndProjectPlanNode) planNode;
+    assertEquals(deserializedNode.getTableName(), "myTable");
+    assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
+    assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);
+    assertEquals(deserializedNode.getOffset(), 0L);
+    assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
+    assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
+    assertNotNull(deserializedNode.getAggInfo());
+    assertEquals(deserializedNode.getGroupByColumns().size(), 0);
+  }
+}
diff --git a/pinot-timeseries/pom.xml b/pinot-timeseries/pom.xml
new file mode 100644
index 0000000000..f49d0cb922
--- /dev/null
+++ b/pinot-timeseries/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pinot</groupId>
+    <artifactId>pinot</artifactId>
+    <version>1.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <packaging>pom</packaging>
+
+  <artifactId>pinot-timeseries</artifactId>
+
+  <properties>
+    <pinot.root>${basedir}/..</pinot.root>
+  </properties>
+
+  <modules>
+    <module>pinot-timeseries-spi</module>
+  </modules>
+
+</project>
diff --git a/pom.xml b/pom.xml
index 066b542250..3f701f474f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
     <module>pinot-compatibility-verifier</module>
     <module>pinot-query-planner</module>
     <module>pinot-query-runtime</module>
+    <module>pinot-timeseries</module>
   </modules>
 
   <licenses>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to