This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4709954097 Part-4: Remove Unnecessary TimeSeries Materialization + 
Minor Cleanups (#14092)
4709954097 is described below

commit 4709954097703f88dbd419ab0436a4c1356e9c85
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Fri Sep 27 16:56:39 2024 +0530

    Part-4: Remove Unnecessary TimeSeries Materialization + Minor Cleanups 
(#14092)
---
 .../broker/api/resources/PinotClientRequest.java   |  2 +-
 .../operator/blocks/TimeSeriesBuilderBlock.java    | 61 ++++++++++++++++++++++
 .../operator/blocks/results/ResultsBlockUtils.java | 13 +++++
 .../blocks/results/TimeSeriesResultsBlock.java     | 12 ++---
 .../merger/TimeSeriesAggResultsBlockMerger.java    | 32 ++++--------
 .../timeseries/TimeSeriesAggregationOperator.java  | 11 +---
 .../timeseries/TimeSeriesPassThroughOperator.java  | 52 ------------------
 .../query/executor/ServerQueryExecutorV1Impl.java  | 13 +----
 .../core/query/executor/QueryExecutorTest.java     | 14 +++--
 .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java       |  8 +--
 .../pinot/tsdb/m3ql/time/TimeBucketComputer.java   |  4 +-
 .../runtime/timeseries/LeafTimeSeriesOperator.java |  4 +-
 .../timeseries/PhysicalTimeSeriesPlanVisitor.java  | 10 ++--
 .../tsdb/planner/TimeSeriesQueryEnvironment.java   |  8 +--
 .../tsdb/planner/physical/TableScanVisitor.java    |  6 +--
 .../pinot/tsdb/spi/TimeSeriesLogicalPlanner.java   |  8 +--
 ...ctPlanNode.java => LeafTimeSeriesPlanNode.java} |  8 +--
 .../tsdb/spi/plan/serde/TimeSeriesPlanSerde.java   |  4 +-
 .../tsdb/spi/series/BaseTimeSeriesBuilder.java     |  9 ++++
 .../spi/plan/serde/TimeSeriesPlanSerdeTest.java    | 10 ++--
 20 files changed, 147 insertions(+), 142 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index bb9e36f204..0d0d2ee5b4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -259,7 +259,7 @@ public class PinotClientRequest {
         asyncResponse.resume(response);
       }
     } catch (Exception e) {
-      LOGGER.error("Caught exception while processing POST request", e);
+      LOGGER.error("Caught exception while processing GET request", e);
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
       asyncResponse.resume(Response.serverError().entity(
               new PinotBrokerTimeSeriesResponse("error", null, 
e.getClass().getSimpleName(), e.getMessage()))
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java
new file mode 100644
index 0000000000..0f9eda897d
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.blocks;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.pinot.core.operator.combine.merger.TimeSeriesAggResultsBlockMerger;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * Block used by the {@link TimeSeriesAggResultsBlockMerger}.
+ */
+public class TimeSeriesBuilderBlock {
+  private final TimeBuckets _timeBuckets;
+  private final Map<Long, BaseTimeSeriesBuilder> _seriesBuilderMap;
+
+  public TimeSeriesBuilderBlock(TimeBuckets timeBuckets, Map<Long, 
BaseTimeSeriesBuilder> seriesBuilderMap) {
+    _timeBuckets = timeBuckets;
+    _seriesBuilderMap = seriesBuilderMap;
+  }
+
+  public TimeBuckets getTimeBuckets() {
+    return _timeBuckets;
+  }
+
+  public Map<Long, BaseTimeSeriesBuilder> getSeriesBuilderMap() {
+    return _seriesBuilderMap;
+  }
+
+  public TimeSeriesBlock build() {
+    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
+    for (var entry : _seriesBuilderMap.entrySet()) {
+      List<TimeSeries> result = new ArrayList<>(1);
+      result.add(entry.getValue().build());
+      seriesMap.put(entry.getKey(), result);
+    }
+    return new TimeSeriesBlock(_timeBuckets, seriesMap);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
index 9775d04d1c..5969053755 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -25,8 +26,10 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.TimeSeriesContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -40,6 +43,9 @@ public class ResultsBlockUtils {
   }
 
   public static BaseResultsBlock buildEmptyQueryResults(QueryContext 
queryContext) {
+    if (QueryContextUtils.isTimeSeriesQuery(queryContext)) {
+      return buildEmptyTimeSeriesResults(queryContext);
+    }
     if (QueryContextUtils.isSelectionQuery(queryContext)) {
       return buildEmptySelectionQueryResults(queryContext);
     }
@@ -117,4 +123,11 @@ public class ResultsBlockUtils {
         queryContext.isNullHandlingEnabled());
     return new DistinctResultsBlock(distinctTable, queryContext);
   }
+
+  private static TimeSeriesResultsBlock 
buildEmptyTimeSeriesResults(QueryContext queryContext) {
+    TimeSeriesContext timeSeriesContext = queryContext.getTimeSeriesContext();
+    Preconditions.checkNotNull(timeSeriesContext);
+    return new TimeSeriesResultsBlock(
+        new TimeSeriesBuilderBlock(timeSeriesContext.getTimeBuckets(), 
Collections.emptyMap()));
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
index 085d558d0f..30a66bd624 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
@@ -23,15 +23,15 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
 
 public class TimeSeriesResultsBlock extends BaseResultsBlock {
-  private final TimeSeriesBlock _seriesBlock;
+  private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock;
 
-  public TimeSeriesResultsBlock(TimeSeriesBlock seriesBlock) {
-    _seriesBlock = seriesBlock;
+  public TimeSeriesResultsBlock(TimeSeriesBuilderBlock timeSeriesBuilderBlock) 
{
+    _timeSeriesBuilderBlock = timeSeriesBuilderBlock;
   }
 
   @Override
@@ -66,7 +66,7 @@ public class TimeSeriesResultsBlock extends BaseResultsBlock {
     return null;
   }
 
-  public TimeSeriesBlock getTimeSeriesBlock() {
-    return _seriesBlock;
+  public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() {
+    return _timeSeriesBuilderBlock;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
index 283073c9ce..17f22a1737 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
@@ -18,14 +18,10 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
-import org.apache.pinot.tsdb.spi.series.TimeSeries;
-import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
 
 
@@ -40,26 +36,16 @@ public class TimeSeriesAggResultsBlockMerger implements 
ResultsBlockMerger<TimeS
 
   @Override
   public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, 
TimeSeriesResultsBlock blockToMerge) {
-    TimeSeriesBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBlock();
-    TimeSeriesBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBlock();
-    for (var entry : seriesBlockToMerge.getSeriesMap().entrySet()) {
+    TimeSeriesBuilderBlock currentTimeSeriesBlock = 
mergedBlock.getTimeSeriesBuilderBlock();
+    TimeSeriesBuilderBlock seriesBlockToMerge = 
blockToMerge.getTimeSeriesBuilderBlock();
+    for (var entry : seriesBlockToMerge.getSeriesBuilderMap().entrySet()) {
       long seriesHash = entry.getKey();
-      List<TimeSeries> currentTimeSeriesList = 
currentTimeSeriesBlock.getSeriesMap().get(seriesHash);
-      TimeSeries currentTimeSeries = null;
-      if (currentTimeSeriesList != null && !currentTimeSeriesList.isEmpty()) {
-        currentTimeSeries = currentTimeSeriesList.get(0);
-      }
-      TimeSeries newTimeSeriesToMerge = entry.getValue().get(0);
-      if (currentTimeSeries == null) {
-        List<TimeSeries> newTimeSeriesList = new ArrayList<>();
-        newTimeSeriesList.add(newTimeSeriesToMerge);
-        currentTimeSeriesBlock.getSeriesMap().put(seriesHash, 
newTimeSeriesList);
+      BaseTimeSeriesBuilder currentTimeSeriesBuilder = 
currentTimeSeriesBlock.getSeriesBuilderMap().get(seriesHash);
+      BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue();
+      if (currentTimeSeriesBuilder == null) {
+        currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, 
newTimeSeriesToMerge);
       } else {
-        BaseTimeSeriesBuilder mergedTimeSeriesBuilder = 
_seriesBuilderFactory.newTimeSeriesBuilder(
-            _aggInfo, currentTimeSeries.getId(), 
currentTimeSeries.getTimeBuckets(), currentTimeSeries.getTagNames(),
-            currentTimeSeries.getTagValues());
-        mergedTimeSeriesBuilder.mergeAlignedSeries(newTimeSeriesToMerge);
-        currentTimeSeriesBlock.getSeriesMap().put(seriesHash, 
ImmutableList.of(mergedTimeSeriesBuilder.build()));
+        
currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge);
       }
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index af7acc4f19..93ef05949b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.operator.timeseries;
 
 import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,13 +32,13 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.BaseProjectOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
-import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
 
 
@@ -123,13 +122,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
         throw new IllegalStateException(
             "Don't yet support value expression of type: " + 
valueExpressionBlockValSet.getValueType());
     }
-    Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
-    for (var entry : seriesBuilderMap.entrySet()) {
-      List<TimeSeries> seriesList = new ArrayList<>();
-      seriesList.add(entry.getValue().build());
-      seriesMap.put(entry.getKey(), seriesList);
-    }
-    return new TimeSeriesResultsBlock(new TimeSeriesBlock(_timeBuckets, 
seriesMap));
+    return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, 
seriesBuilderMap));
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesPassThroughOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesPassThroughOperator.java
deleted file mode 100644
index ea18048342..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesPassThroughOperator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.timeseries;
-
-import java.util.Collections;
-import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
-import org.apache.pinot.core.operator.combine.TimeSeriesCombineOperator;
-import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
-import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
-
-
-/**
- * Adapter operator that ties in the Pinot TimeSeriesCombineOperator with the 
Pinot BaseTimeSeriesOperator.
- * This operator explicitly calls the underlying combine operator, unwraps the 
{@link TimeSeriesResultsBlock}, and
- * links it with the rest of the operator chain consisting of {@link 
BaseTimeSeriesOperator}.
- */
-public class TimeSeriesPassThroughOperator extends BaseTimeSeriesOperator {
-  private static final String EXPLAIN_NAME = 
"TIME_SERIES_PASS_THROUGH_OPERATOR";
-  private final TimeSeriesCombineOperator _timeSeriesCombineOperator;
-
-  public TimeSeriesPassThroughOperator(TimeSeriesCombineOperator 
combineOperator) {
-    super(Collections.emptyList());
-    _timeSeriesCombineOperator = combineOperator;
-  }
-
-  @Override
-  public TimeSeriesBlock getNextBlock() {
-    TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
_timeSeriesCombineOperator.nextBlock();
-    return resultsBlock.getTimeSeriesBlock();
-  }
-
-  @Override
-  public String getExplainName() {
-    return EXPLAIN_NAME;
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index f84ef89878..58d3befb56 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -57,7 +57,6 @@ import 
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExplainResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExplainV2ResultBlock;
 import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils;
-import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
 import org.apache.pinot.core.plan.ExplainInfo;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.PlanMaker;
@@ -69,7 +68,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -88,7 +86,6 @@ import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -731,15 +728,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       List<SegmentContext> selectedSegmentContexts)
       throws TimeoutException {
     if (selectedSegmentContexts.isEmpty()) {
-      if (QueryContextUtils.isTimeSeriesQuery(queryContext)) {
-        // TODO: handle invalid segments
-        TimeSeriesBlock seriesBlock = new TimeSeriesBlock(
-            queryContext.getTimeSeriesContext().getTimeBuckets(), 
Collections.emptyMap());
-        TimeSeriesResultsBlock resultsBlock = new 
TimeSeriesResultsBlock(seriesBlock);
-        return new InstanceResponseBlock(resultsBlock);
-      } else {
-        return new 
InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext));
-      }
+      return new 
InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext));
     }
     InstanceResponseBlock instanceResponse;
     Plan queryPlan = planCombineQuery(queryContext, timerContext, 
executorService, streamer, selectedSegmentContexts);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 33d83843f6..ff0e0ace8f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -71,6 +71,7 @@ import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -226,7 +227,8 @@ public class QueryExecutorTest {
     InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
TimeSeriesResultsBlock);
     TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
instanceResponse.getResultsBlock();
-    assertEquals(5, resultsBlock.getTimeSeriesBlock().getSeriesMap().size());
+    TimeSeriesBlock timeSeriesBlock = 
resultsBlock.getTimeSeriesBuilderBlock().build();
+    assertEquals(5, timeSeriesBlock.getSeriesMap().size());
   }
 
   @Test
@@ -241,10 +243,11 @@ public class QueryExecutorTest {
     InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
TimeSeriesResultsBlock);
     TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
instanceResponse.getResultsBlock();
-    assertEquals(5, resultsBlock.getTimeSeriesBlock().getSeriesMap().size());
+    TimeSeriesBlock timeSeriesBlock = 
resultsBlock.getTimeSeriesBuilderBlock().build();
+    assertEquals(5, timeSeriesBlock.getSeriesMap().size());
     // For any city, say "New York", the max order item count should be 4
     boolean foundNewYork = false;
-    for (var listOfTimeSeries : 
resultsBlock.getTimeSeriesBlock().getSeriesMap().values()) {
+    for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) {
       assertEquals(listOfTimeSeries.size(), 1);
       TimeSeries timeSeries = listOfTimeSeries.get(0);
       if (timeSeries.getTagValues()[0].equals("New York")) {
@@ -272,10 +275,11 @@ public class QueryExecutorTest {
     InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
TimeSeriesResultsBlock);
     TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) 
instanceResponse.getResultsBlock();
-    assertEquals(5, resultsBlock.getTimeSeriesBlock().getSeriesMap().size());
+    TimeSeriesBlock timeSeriesBlock = 
resultsBlock.getTimeSeriesBuilderBlock().build();
+    assertEquals(5, timeSeriesBlock.getSeriesMap().size());
     // For any city, say "Chicago", the min order item count should be 0
     boolean foundChicago = false;
-    for (var listOfTimeSeries : 
resultsBlock.getTimeSeriesBlock().getSeriesMap().values()) {
+    for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) {
       assertEquals(listOfTimeSeries.size(), 1);
       TimeSeries timeSeries = listOfTimeSeries.get(0);
       if (timeSeries.getTagValues()[0].equals("Chicago")) {
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index f355e464eb..0d0254128f 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -22,10 +22,10 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.tsdb.m3ql.parser.Tokenizer;
 import org.apache.pinot.tsdb.m3ql.plan.KeepLastValuePlanNode;
 import org.apache.pinot.tsdb.m3ql.plan.TransformNullPlanNode;
@@ -36,12 +36,12 @@ import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
 import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner {
   @Override
-  public void init(Map<String, Object> config) {
+  public void init(PinotConfiguration pinotConfiguration) {
   }
 
   @Override
@@ -152,7 +152,7 @@ public class M3TimeSeriesPlanner implements 
TimeSeriesLogicalPlanner {
     Preconditions.checkNotNull(timeColumn, "Time column not set. Set via 
time_col=");
     Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via 
time_unit=");
     Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via 
value=");
-    return new ScanFilterAndProjectPlanNode(planId, children, tableName, 
timeColumn, timeUnit, 0L, filter, valueExpr,
+    return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, 
timeUnit, 0L, filter, valueExpr,
         aggInfo, groupByColumns);
   }
 }
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java
index d79d5c6a8c..a7353ee6c8 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 public class TimeBucketComputer {
@@ -42,7 +42,7 @@ public class TimeBucketComputer {
   }
 
   public static QueryTimeBoundaryConstraints process(BaseTimeSeriesPlanNode 
planNode, RangeTimeSeriesRequest request) {
-    if (planNode instanceof ScanFilterAndProjectPlanNode) {
+    if (planNode instanceof LeafTimeSeriesPlanNode) {
       QueryTimeBoundaryConstraints constraints = new 
QueryTimeBoundaryConstraints();
       constraints.getDivisors().add(request.getStepSeconds());
       return constraints;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index 74da11c5b8..ca119ebf1d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -53,7 +53,9 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
       String oneException = 
instanceResponseBlock.getExceptions().values().iterator().next();
       throw new RuntimeException(oneException);
     }
-    return ((TimeSeriesResultsBlock) 
instanceResponseBlock.getResultsBlock()).getTimeSeriesBlock();
+    TimeSeriesResultsBlock timeSeriesResultsBlock =
+        ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock());
+    return timeSeriesResultsBlock.getTimeSeriesBuilderBlock().build();
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
index 258e41c51e..5e42d1b4b5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
@@ -33,7 +33,7 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 public class PhysicalTimeSeriesPlanVisitor {
@@ -62,8 +62,8 @@ public class PhysicalTimeSeriesPlanVisitor {
   public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, 
TimeSeriesExecutionContext context) {
     for (int index = 0; index < planNode.getChildren().size(); index++) {
       BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index);
-      if (childNode instanceof ScanFilterAndProjectPlanNode) {
-        ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) 
childNode;
+      if (childNode instanceof LeafTimeSeriesPlanNode) {
+        LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) childNode;
         List<String> segments = 
context.getPlanIdToSegmentsMap().get(sfpNode.getId());
         ServerQueryRequest serverQueryRequest = 
compileLeafServerQueryRequest(sfpNode, segments, context);
         TimeSeriesPhysicalTableScan physicalTableScan = new 
TimeSeriesPhysicalTableScan(childNode.getId(),
@@ -75,13 +75,13 @@ public class PhysicalTimeSeriesPlanVisitor {
     }
   }
 
-  public ServerQueryRequest 
compileLeafServerQueryRequest(ScanFilterAndProjectPlanNode sfpNode, 
List<String> segments,
+  public ServerQueryRequest 
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode sfpNode, List<String> 
segments,
       TimeSeriesExecutionContext context) {
     return new ServerQueryRequest(compileQueryContext(sfpNode, context),
         segments, /* TODO: Pass metadata from request */ 
Collections.emptyMap(), _serverMetrics);
   }
 
-  public QueryContext compileQueryContext(ScanFilterAndProjectPlanNode 
sfpNode, TimeSeriesExecutionContext context) {
+  public QueryContext compileQueryContext(LeafTimeSeriesPlanNode sfpNode, 
TimeSeriesExecutionContext context) {
     FilterContext filterContext =
         RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
             sfpNode.getEffectiveFilter(context.getInitialTimeBuckets())));
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
index dada38f6f3..493efb2361 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
@@ -43,7 +43,7 @@ import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
 import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
 import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +74,7 @@ public class TimeSeriesQueryEnvironment {
         Class<?> klass = 
TimeSeriesQueryEnvironment.class.getClassLoader().loadClass(klassName);
         Constructor<?> constructor = klass.getConstructor();
         TimeSeriesLogicalPlanner planner = (TimeSeriesLogicalPlanner) 
constructor.newInstance();
-        planner.init(config.subset(configPrefix).toMap());
+        planner.init(config.subset(configPrefix));
         _plannerMap.put(language, planner);
       } catch (Exception e) {
         throw new RuntimeException("Failed to instantiate logical planner for 
language: " + language, e);
@@ -121,8 +121,8 @@ public class TimeSeriesQueryEnvironment {
   }
 
   public static void findTableNames(BaseTimeSeriesPlanNode planNode, 
Consumer<String> tableNameConsumer) {
-    if (planNode instanceof ScanFilterAndProjectPlanNode) {
-      ScanFilterAndProjectPlanNode scanNode = (ScanFilterAndProjectPlanNode) 
planNode;
+    if (planNode instanceof LeafTimeSeriesPlanNode) {
+      LeafTimeSeriesPlanNode scanNode = (LeafTimeSeriesPlanNode) planNode;
       tableNameConsumer.accept(scanNode.getTableName());
       return;
     }
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
index 58eccd2de0..99728e0f6d 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
@@ -32,7 +32,7 @@ import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 public class TableScanVisitor {
@@ -47,8 +47,8 @@ public class TableScanVisitor {
   }
 
   public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, 
TimeBuckets timeBuckets, Context context) {
-    if (planNode instanceof ScanFilterAndProjectPlanNode) {
-      ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) 
planNode;
+    if (planNode instanceof LeafTimeSeriesPlanNode) {
+      LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) planNode;
       Expression filterExpression = 
CalciteSqlParser.compileToExpression(sfpNode.getEffectiveFilter(timeBuckets));
       RoutingTable routingTable = _routingManager.getRoutingTable(
           compileBrokerRequest(sfpNode.getTableName(), filterExpression),
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
index 0c7e724ca8..b5004ff424 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.tsdb.spi;
 
-import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 /**
@@ -30,10 +30,10 @@ import 
org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
  * {@link BaseTimeSeriesPlanNode}. Other than the plan-tree, the planner also 
returns a {@link TimeBuckets} which is
  * the default TimeBuckets used by the query operators at runtime. 
Implementations are free to adjust them as they see
  * fit. For instance, one query language might want to extend to the left or 
right of the time-range based on certain
- * operators. Also, see {@link 
ScanFilterAndProjectPlanNode#getEffectiveFilter(TimeBuckets)}.
+ * operators. Also, see {@link 
LeafTimeSeriesPlanNode#getEffectiveFilter(TimeBuckets)}.
  */
 public interface TimeSeriesLogicalPlanner {
-  void init(Map<String, Object> config);
+  void init(PinotConfiguration pinotConfiguration);
 
   TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request);
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
similarity index 94%
rename from 
pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
rename to 
pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index e2a0a15f27..18d6316776 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -35,8 +35,8 @@ import 
org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
  * <b>Note:</b> You don't need to pass the time-filter to the filter 
expression, since Pinot will automatically compute
  *   the time filter based on the computed time buckets in {@link 
TimeSeriesLogicalPlanner}.
  */
-public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode {
-  private static final String EXPLAIN_NAME = "SCAN_FILTER_AND_PROJECT";
+public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
+  private static final String EXPLAIN_NAME = "LEAF_TIME_SERIES_PLAN_NODE";
   private final String _tableName;
   private final String _timeColumn;
   private final TimeUnit _timeUnit;
@@ -47,7 +47,7 @@ public class ScanFilterAndProjectPlanNode extends 
BaseTimeSeriesPlanNode {
   private final List<String> _groupByColumns;
 
   @JsonCreator
-  public ScanFilterAndProjectPlanNode(
+  public LeafTimeSeriesPlanNode(
       @JsonProperty("id") String id, @JsonProperty("children") 
List<BaseTimeSeriesPlanNode> children,
       @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") 
String timeColumn,
       @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") 
Long offset,
@@ -68,7 +68,7 @@ public class ScanFilterAndProjectPlanNode extends 
BaseTimeSeriesPlanNode {
 
   @Override
   public String getKlass() {
-    return ScanFilterAndProjectPlanNode.class.getName();
+    return LeafTimeSeriesPlanNode.class.getName();
   }
 
   @Override
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
index e4e036e1ff..1e7775ff5a 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
@@ -26,12 +26,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
 
 /**
  * We have implemented a custom serialization/deserialization mechanism for 
time series plans. This allows users to
- * use Jackson to annotate their plan nodes as shown in {@link 
ScanFilterAndProjectPlanNode}, which is used for
+ * use Jackson to annotate their plan nodes as shown in {@link 
LeafTimeSeriesPlanNode}, which is used for
  * plan serde for broker/server communication.
  * TODO: There are limitations to this and we will change this soon. Issues:
  *   1. Pinot TS SPI is compiled in Pinot distribution and Jackson deps get 
shaded usually.
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
index 3509e7cfcd..538f921b55 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -71,5 +71,14 @@ public abstract class BaseTimeSeriesBuilder {
     }
   }
 
+  /**
+   * Adds an un-built series-builder to this builder. Implementations may want 
to override this method, especially for
+   * complex aggregations, where the series builder accumulates results in a 
complex object. (e.g. percentile)
+   */
+  public void mergeAlignedSeriesBuilder(BaseTimeSeriesBuilder builder) {
+    TimeSeries timeSeries = builder.build();
+    mergeAlignedSeries(timeSeries);
+  }
+
   public abstract TimeSeries build();
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index ff74b6ef35..a5015dc991 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
-import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.*;
@@ -31,15 +31,15 @@ import static org.testng.Assert.*;
 public class TimeSeriesPlanSerdeTest {
   @Test
   public void testSerdeForScanFilterProjectNode() {
-    ScanFilterAndProjectPlanNode scanFilterAndProjectPlanNode = new 
ScanFilterAndProjectPlanNode(
+    LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode(
         "sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", 
TimeUnit.MILLISECONDS,
         0L, "myFilterExpression", "myValueExpression",
         new AggInfo("SUM"), new ArrayList<>()
     );
     BaseTimeSeriesPlanNode planNode =
-        
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(scanFilterAndProjectPlanNode));
-    assertTrue(planNode instanceof ScanFilterAndProjectPlanNode);
-    ScanFilterAndProjectPlanNode deserializedNode = 
(ScanFilterAndProjectPlanNode) planNode;
+        
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
+    assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
+    LeafTimeSeriesPlanNode deserializedNode = (LeafTimeSeriesPlanNode) 
planNode;
     assertEquals(deserializedNode.getTableName(), "myTable");
     assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
     assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to