This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4709954097 Part-4: Remove Unnecessary TimeSeries Materialization + Minor Cleanups (#14092) 4709954097 is described below commit 4709954097703f88dbd419ab0436a4c1356e9c85 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Fri Sep 27 16:56:39 2024 +0530 Part-4: Remove Unnecessary TimeSeries Materialization + Minor Cleanups (#14092) --- .../broker/api/resources/PinotClientRequest.java | 2 +- .../operator/blocks/TimeSeriesBuilderBlock.java | 61 ++++++++++++++++++++++ .../operator/blocks/results/ResultsBlockUtils.java | 13 +++++ .../blocks/results/TimeSeriesResultsBlock.java | 12 ++--- .../merger/TimeSeriesAggResultsBlockMerger.java | 32 ++++-------- .../timeseries/TimeSeriesAggregationOperator.java | 11 +--- .../timeseries/TimeSeriesPassThroughOperator.java | 52 ------------------ .../query/executor/ServerQueryExecutorV1Impl.java | 13 +---- .../core/query/executor/QueryExecutorTest.java | 14 +++-- .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 8 +-- .../pinot/tsdb/m3ql/time/TimeBucketComputer.java | 4 +- .../runtime/timeseries/LeafTimeSeriesOperator.java | 4 +- .../timeseries/PhysicalTimeSeriesPlanVisitor.java | 10 ++-- .../tsdb/planner/TimeSeriesQueryEnvironment.java | 8 +-- .../tsdb/planner/physical/TableScanVisitor.java | 6 +-- .../pinot/tsdb/spi/TimeSeriesLogicalPlanner.java | 8 +-- ...ctPlanNode.java => LeafTimeSeriesPlanNode.java} | 8 +-- .../tsdb/spi/plan/serde/TimeSeriesPlanSerde.java | 4 +- .../tsdb/spi/series/BaseTimeSeriesBuilder.java | 9 ++++ .../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 10 ++-- 20 files changed, 147 insertions(+), 142 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index bb9e36f204..0d0d2ee5b4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -259,7 +259,7 @@ public class PinotClientRequest { asyncResponse.resume(response); } } catch (Exception e) { - LOGGER.error("Caught exception while processing POST request", e); + LOGGER.error("Caught exception while processing GET request", e); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L); asyncResponse.resume(Response.serverError().entity( new PinotBrokerTimeSeriesResponse("error", null, e.getClass().getSimpleName(), e.getMessage())) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java new file mode 100644 index 0000000000..0f9eda897d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TimeSeriesBuilderBlock.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.blocks; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.core.operator.combine.merger.TimeSeriesAggResultsBlockMerger; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +/** + * Block used by the {@link TimeSeriesAggResultsBlockMerger}. + */ +public class TimeSeriesBuilderBlock { + private final TimeBuckets _timeBuckets; + private final Map<Long, BaseTimeSeriesBuilder> _seriesBuilderMap; + + public TimeSeriesBuilderBlock(TimeBuckets timeBuckets, Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap) { + _timeBuckets = timeBuckets; + _seriesBuilderMap = seriesBuilderMap; + } + + public TimeBuckets getTimeBuckets() { + return _timeBuckets; + } + + public Map<Long, BaseTimeSeriesBuilder> getSeriesBuilderMap() { + return _seriesBuilderMap; + } + + public TimeSeriesBlock build() { + Map<Long, List<TimeSeries>> seriesMap = new HashMap<>(); + for (var entry : _seriesBuilderMap.entrySet()) { + List<TimeSeries> result = new ArrayList<>(1); + result.add(entry.getValue().build()); + seriesMap.put(entry.getKey(), result); + } + return new TimeSeriesBlock(_timeBuckets, seriesMap); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java index 9775d04d1c..5969053755 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.blocks.results; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -25,8 +26,10 @@ import java.util.List; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.distinct.DistinctTable; @@ -40,6 +43,9 @@ public class ResultsBlockUtils { } public static BaseResultsBlock buildEmptyQueryResults(QueryContext queryContext) { + if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { + return buildEmptyTimeSeriesResults(queryContext); + } if (QueryContextUtils.isSelectionQuery(queryContext)) { return buildEmptySelectionQueryResults(queryContext); } @@ -117,4 +123,11 @@ public class ResultsBlockUtils { queryContext.isNullHandlingEnabled()); return new DistinctResultsBlock(distinctTable, queryContext); } + + private static TimeSeriesResultsBlock buildEmptyTimeSeriesResults(QueryContext queryContext) { + TimeSeriesContext timeSeriesContext = queryContext.getTimeSeriesContext(); + Preconditions.checkNotNull(timeSeriesContext); + return new TimeSeriesResultsBlock( + new TimeSeriesBuilderBlock(timeSeriesContext.getTimeBuckets(), Collections.emptyMap())); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java index 085d558d0f..30a66bd624 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java @@ -23,15 +23,15 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; public class TimeSeriesResultsBlock extends BaseResultsBlock { - private final TimeSeriesBlock _seriesBlock; + private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock; - public TimeSeriesResultsBlock(TimeSeriesBlock seriesBlock) { - _seriesBlock = seriesBlock; + public TimeSeriesResultsBlock(TimeSeriesBuilderBlock timeSeriesBuilderBlock) { + _timeSeriesBuilderBlock = timeSeriesBuilderBlock; } @Override @@ -66,7 +66,7 @@ public class TimeSeriesResultsBlock extends BaseResultsBlock { return null; } - public TimeSeriesBlock getTimeSeriesBlock() { - return _seriesBlock; + public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() { + return _timeSeriesBuilderBlock; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java index 283073c9ce..17f22a1737 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java @@ -18,14 +18,10 @@ */ package org.apache.pinot.core.operator.combine.merger; -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; +import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; @@ -40,26 +36,16 @@ public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger<TimeS @Override public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesResultsBlock blockToMerge) { - TimeSeriesBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBlock(); - TimeSeriesBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBlock(); - for (var entry : seriesBlockToMerge.getSeriesMap().entrySet()) { + TimeSeriesBuilderBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBuilderBlock(); + TimeSeriesBuilderBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBuilderBlock(); + for (var entry : seriesBlockToMerge.getSeriesBuilderMap().entrySet()) { long seriesHash = entry.getKey(); - List<TimeSeries> currentTimeSeriesList = currentTimeSeriesBlock.getSeriesMap().get(seriesHash); - TimeSeries currentTimeSeries = null; - if (currentTimeSeriesList != null && !currentTimeSeriesList.isEmpty()) { - currentTimeSeries = currentTimeSeriesList.get(0); - } - TimeSeries newTimeSeriesToMerge = entry.getValue().get(0); - if (currentTimeSeries == null) { - List<TimeSeries> newTimeSeriesList = new ArrayList<>(); - newTimeSeriesList.add(newTimeSeriesToMerge); - currentTimeSeriesBlock.getSeriesMap().put(seriesHash, newTimeSeriesList); + BaseTimeSeriesBuilder currentTimeSeriesBuilder = currentTimeSeriesBlock.getSeriesBuilderMap().get(seriesHash); + BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue(); + if (currentTimeSeriesBuilder == null) { + currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, newTimeSeriesToMerge); } else { - BaseTimeSeriesBuilder mergedTimeSeriesBuilder = _seriesBuilderFactory.newTimeSeriesBuilder( - _aggInfo, currentTimeSeries.getId(), currentTimeSeries.getTimeBuckets(), currentTimeSeries.getTagNames(), - currentTimeSeries.getTagValues()); - mergedTimeSeriesBuilder.mergeAlignedSeries(newTimeSeriesToMerge); - currentTimeSeriesBlock.getSeriesMap().put(seriesHash, ImmutableList.of(mergedTimeSeriesBuilder.build())); + currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java index af7acc4f19..93ef05949b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java @@ -19,7 +19,6 @@ package org.apache.pinot.core.operator.timeseries; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,13 +32,13 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.BaseProjectOperator; import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; import org.apache.pinot.tsdb.spi.series.TimeSeries; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; @@ -123,13 +122,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult throw new IllegalStateException( "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); } - Map<Long, List<TimeSeries>> seriesMap = new HashMap<>(); - for (var entry : seriesBuilderMap.entrySet()) { - List<TimeSeries> seriesList = new ArrayList<>(); - seriesList.add(entry.getValue().build()); - seriesMap.put(entry.getKey(), seriesList); - } - return new TimeSeriesResultsBlock(new TimeSeriesBlock(_timeBuckets, seriesMap)); + return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, seriesBuilderMap)); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesPassThroughOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesPassThroughOperator.java deleted file mode 100644 index ea18048342..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesPassThroughOperator.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.timeseries; - -import java.util.Collections; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; -import org.apache.pinot.core.operator.combine.TimeSeriesCombineOperator; -import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; - - -/** - * Adapter operator that ties in the Pinot TimeSeriesCombineOperator with the Pinot BaseTimeSeriesOperator. - * This operator explicitly calls the underlying combine operator, unwraps the {@link TimeSeriesResultsBlock}, and - * links it with the rest of the operator chain consisting of {@link BaseTimeSeriesOperator}. - */ -public class TimeSeriesPassThroughOperator extends BaseTimeSeriesOperator { - private static final String EXPLAIN_NAME = "TIME_SERIES_PASS_THROUGH_OPERATOR"; - private final TimeSeriesCombineOperator _timeSeriesCombineOperator; - - public TimeSeriesPassThroughOperator(TimeSeriesCombineOperator combineOperator) { - super(Collections.emptyList()); - _timeSeriesCombineOperator = combineOperator; - } - - @Override - public TimeSeriesBlock getNextBlock() { - TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) _timeSeriesCombineOperator.nextBlock(); - return resultsBlock.getTimeSeriesBlock(); - } - - @Override - public String getExplainName() { - return EXPLAIN_NAME; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index f84ef89878..58d3befb56 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -57,7 +57,6 @@ import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; import org.apache.pinot.core.operator.blocks.results.ExplainResultsBlock; import org.apache.pinot.core.operator.blocks.results.ExplainV2ResultBlock; import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils; -import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; import org.apache.pinot.core.plan.ExplainInfo; import org.apache.pinot.core.plan.Plan; import org.apache.pinot.core.plan.maker.PlanMaker; @@ -69,7 +68,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.TimerContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; -import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.util.trace.TraceContext; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; @@ -88,7 +86,6 @@ import org.apache.pinot.spi.exception.QueryCancelledException; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -731,15 +728,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { List<SegmentContext> selectedSegmentContexts) throws TimeoutException { if (selectedSegmentContexts.isEmpty()) { - if (QueryContextUtils.isTimeSeriesQuery(queryContext)) { - // TODO: handle invalid segments - TimeSeriesBlock seriesBlock = new TimeSeriesBlock( - queryContext.getTimeSeriesContext().getTimeBuckets(), Collections.emptyMap()); - TimeSeriesResultsBlock resultsBlock = new TimeSeriesResultsBlock(seriesBlock); - return new InstanceResponseBlock(resultsBlock); - } else { - return new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext)); - } + return new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext)); } InstanceResponseBlock instanceResponse; Plan queryPlan = planCombineQuery(queryContext, timerContext, executorService, streamer, selectedSegmentContexts); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 33d83843f6..ff0e0ace8f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -71,6 +71,7 @@ import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -226,7 +227,8 @@ public class QueryExecutorTest { InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - assertEquals(5, resultsBlock.getTimeSeriesBlock().getSeriesMap().size()); + TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); + assertEquals(5, timeSeriesBlock.getSeriesMap().size()); } @Test @@ -241,10 +243,11 @@ public class QueryExecutorTest { InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - assertEquals(5, resultsBlock.getTimeSeriesBlock().getSeriesMap().size()); + TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); + assertEquals(5, timeSeriesBlock.getSeriesMap().size()); // For any city, say "New York", the max order item count should be 4 boolean foundNewYork = false; - for (var listOfTimeSeries : resultsBlock.getTimeSeriesBlock().getSeriesMap().values()) { + for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) { assertEquals(listOfTimeSeries.size(), 1); TimeSeries timeSeries = listOfTimeSeries.get(0); if (timeSeries.getTagValues()[0].equals("New York")) { @@ -272,10 +275,11 @@ public class QueryExecutorTest { InstanceResponseBlock instanceResponse = _queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS); assertTrue(instanceResponse.getResultsBlock() instanceof TimeSeriesResultsBlock); TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); - assertEquals(5, resultsBlock.getTimeSeriesBlock().getSeriesMap().size()); + TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); + assertEquals(5, timeSeriesBlock.getSeriesMap().size()); // For any city, say "Chicago", the min order item count should be 0 boolean foundChicago = false; - for (var listOfTimeSeries : resultsBlock.getTimeSeriesBlock().getSeriesMap().values()) { + for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) { assertEquals(listOfTimeSeries.size(), 1); TimeSeries timeSeries = listOfTimeSeries.get(0); if (timeSeries.getTagValues()[0].equals("Chicago")) { diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java index f355e464eb..0d0254128f 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -22,10 +22,10 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.tsdb.m3ql.parser.Tokenizer; import org.apache.pinot.tsdb.m3ql.plan.KeepLastValuePlanNode; import org.apache.pinot.tsdb.m3ql.plan.TransformNullPlanNode; @@ -36,12 +36,12 @@ import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner { @Override - public void init(Map<String, Object> config) { + public void init(PinotConfiguration pinotConfiguration) { } @Override @@ -152,7 +152,7 @@ public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner { Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col="); Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit="); Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value="); - return new ScanFilterAndProjectPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, + return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, aggInfo, groupByColumns); } } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java index d79d5c6a8c..a7353ee6c8 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java @@ -23,7 +23,7 @@ import java.util.Collection; import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; public class TimeBucketComputer { @@ -42,7 +42,7 @@ public class TimeBucketComputer { } public static QueryTimeBoundaryConstraints process(BaseTimeSeriesPlanNode planNode, RangeTimeSeriesRequest request) { - if (planNode instanceof ScanFilterAndProjectPlanNode) { + if (planNode instanceof LeafTimeSeriesPlanNode) { QueryTimeBoundaryConstraints constraints = new QueryTimeBoundaryConstraints(); constraints.getDivisors().add(request.getStepSeconds()); return constraints; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java index 74da11c5b8..ca119ebf1d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java @@ -53,7 +53,9 @@ public class LeafTimeSeriesOperator extends BaseTimeSeriesOperator { String oneException = instanceResponseBlock.getExceptions().values().iterator().next(); throw new RuntimeException(oneException); } - return ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock()).getTimeSeriesBlock(); + TimeSeriesResultsBlock timeSeriesResultsBlock = + ((TimeSeriesResultsBlock) instanceResponseBlock.getResultsBlock()); + return timeSeriesResultsBlock.getTimeSeriesBuilderBlock().build(); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java index 258e41c51e..5e42d1b4b5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java @@ -33,7 +33,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; public class PhysicalTimeSeriesPlanVisitor { @@ -62,8 +62,8 @@ public class PhysicalTimeSeriesPlanVisitor { public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutionContext context) { for (int index = 0; index < planNode.getChildren().size(); index++) { BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index); - if (childNode instanceof ScanFilterAndProjectPlanNode) { - ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) childNode; + if (childNode instanceof LeafTimeSeriesPlanNode) { + LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) childNode; List<String> segments = context.getPlanIdToSegmentsMap().get(sfpNode.getId()); ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(sfpNode, segments, context); TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(), @@ -75,13 +75,13 @@ public class PhysicalTimeSeriesPlanVisitor { } } - public ServerQueryRequest compileLeafServerQueryRequest(ScanFilterAndProjectPlanNode sfpNode, List<String> segments, + public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode sfpNode, List<String> segments, TimeSeriesExecutionContext context) { return new ServerQueryRequest(compileQueryContext(sfpNode, context), segments, /* TODO: Pass metadata from request */ Collections.emptyMap(), _serverMetrics); } - public QueryContext compileQueryContext(ScanFilterAndProjectPlanNode sfpNode, TimeSeriesExecutionContext context) { + public QueryContext compileQueryContext(LeafTimeSeriesPlanNode sfpNode, TimeSeriesExecutionContext context) { FilterContext filterContext = RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression( sfpNode.getEffectiveFilter(context.getInitialTimeBuckets()))); diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java index dada38f6f3..493efb2361 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java @@ -43,7 +43,7 @@ import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +74,7 @@ public class TimeSeriesQueryEnvironment { Class<?> klass = TimeSeriesQueryEnvironment.class.getClassLoader().loadClass(klassName); Constructor<?> constructor = klass.getConstructor(); TimeSeriesLogicalPlanner planner = (TimeSeriesLogicalPlanner) constructor.newInstance(); - planner.init(config.subset(configPrefix).toMap()); + planner.init(config.subset(configPrefix)); _plannerMap.put(language, planner); } catch (Exception e) { throw new RuntimeException("Failed to instantiate logical planner for language: " + language, e); @@ -121,8 +121,8 @@ public class TimeSeriesQueryEnvironment { } public static void findTableNames(BaseTimeSeriesPlanNode planNode, Consumer<String> tableNameConsumer) { - if (planNode instanceof ScanFilterAndProjectPlanNode) { - ScanFilterAndProjectPlanNode scanNode = (ScanFilterAndProjectPlanNode) planNode; + if (planNode instanceof LeafTimeSeriesPlanNode) { + LeafTimeSeriesPlanNode scanNode = (LeafTimeSeriesPlanNode) planNode; tableNameConsumer.accept(scanNode.getTableName()); return; } diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java index 58eccd2de0..99728e0f6d 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -32,7 +32,7 @@ import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; public class TableScanVisitor { @@ -47,8 +47,8 @@ public class TableScanVisitor { } public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets timeBuckets, Context context) { - if (planNode instanceof ScanFilterAndProjectPlanNode) { - ScanFilterAndProjectPlanNode sfpNode = (ScanFilterAndProjectPlanNode) planNode; + if (planNode instanceof LeafTimeSeriesPlanNode) { + LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) planNode; Expression filterExpression = CalciteSqlParser.compileToExpression(sfpNode.getEffectiveFilter(timeBuckets)); RoutingTable routingTable = _routingManager.getRoutingTable( compileBrokerRequest(sfpNode.getTableName(), filterExpression), diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java index 0c7e724ca8..b5004ff424 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java @@ -18,9 +18,9 @@ */ package org.apache.pinot.tsdb.spi; -import java.util.Map; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; /** @@ -30,10 +30,10 @@ import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; * {@link BaseTimeSeriesPlanNode}. Other than the plan-tree, the planner also returns a {@link TimeBuckets} which is * the default TimeBuckets used by the query operators at runtime. Implementations are free to adjust them as they see * fit. For instance, one query language might want to extend to the left or right of the time-range based on certain - * operators. Also, see {@link ScanFilterAndProjectPlanNode#getEffectiveFilter(TimeBuckets)}. + * operators. Also, see {@link LeafTimeSeriesPlanNode#getEffectiveFilter(TimeBuckets)}. */ public interface TimeSeriesLogicalPlanner { - void init(Map<String, Object> config); + void init(PinotConfiguration pinotConfiguration); TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java similarity index 94% rename from pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java rename to pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index e2a0a15f27..18d6316776 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -35,8 +35,8 @@ import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; * <b>Note:</b> You don't need to pass the time-filter to the filter expression, since Pinot will automatically compute * the time filter based on the computed time buckets in {@link TimeSeriesLogicalPlanner}. */ -public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode { - private static final String EXPLAIN_NAME = "SCAN_FILTER_AND_PROJECT"; +public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { + private static final String EXPLAIN_NAME = "LEAF_TIME_SERIES_PLAN_NODE"; private final String _tableName; private final String _timeColumn; private final TimeUnit _timeUnit; @@ -47,7 +47,7 @@ public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode { private final List<String> _groupByColumns; @JsonCreator - public ScanFilterAndProjectPlanNode( + public LeafTimeSeriesPlanNode( @JsonProperty("id") String id, @JsonProperty("children") List<BaseTimeSeriesPlanNode> children, @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") String timeColumn, @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") Long offset, @@ -68,7 +68,7 @@ public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode { @Override public String getKlass() { - return ScanFilterAndProjectPlanNode.class.getName(); + return LeafTimeSeriesPlanNode.class.getName(); } @Override diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java index e4e036e1ff..1e7775ff5a 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java @@ -26,12 +26,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.pinot.spi.annotations.InterfaceStability; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; /** * We have implemented a custom serialization/deserialization mechanism for time series plans. This allows users to - * use Jackson to annotate their plan nodes as shown in {@link ScanFilterAndProjectPlanNode}, which is used for + * use Jackson to annotate their plan nodes as shown in {@link LeafTimeSeriesPlanNode}, which is used for * plan serde for broker/server communication. * TODO: There are limitations to this and we will change this soon. Issues: * 1. Pinot TS SPI is compiled in Pinot distribution and Jackson deps get shaded usually. diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java index 3509e7cfcd..538f921b55 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -71,5 +71,14 @@ public abstract class BaseTimeSeriesBuilder { } } + /** + * Adds an un-built series-builder to this builder. Implementations may want to override this method, especially for + * complex aggregations, where the series builder accumulates results in a complex object. (e.g. percentile) + */ + public void mergeAlignedSeriesBuilder(BaseTimeSeriesBuilder builder) { + TimeSeries timeSeries = builder.build(); + mergeAlignedSeries(timeSeries); + } + public abstract TimeSeries build(); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java index ff74b6ef35..a5015dc991 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.concurrent.TimeUnit; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -31,15 +31,15 @@ import static org.testng.Assert.*; public class TimeSeriesPlanSerdeTest { @Test public void testSerdeForScanFilterProjectNode() { - ScanFilterAndProjectPlanNode scanFilterAndProjectPlanNode = new ScanFilterAndProjectPlanNode( + LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode( "sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L, "myFilterExpression", "myValueExpression", new AggInfo("SUM"), new ArrayList<>() ); BaseTimeSeriesPlanNode planNode = - TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(scanFilterAndProjectPlanNode)); - assertTrue(planNode instanceof ScanFilterAndProjectPlanNode); - ScanFilterAndProjectPlanNode deserializedNode = (ScanFilterAndProjectPlanNode) planNode; + TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode)); + assertTrue(planNode instanceof LeafTimeSeriesPlanNode); + LeafTimeSeriesPlanNode deserializedNode = (LeafTimeSeriesPlanNode) planNode; assertEquals(deserializedNode.getTableName(), "myTable"); assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn"); assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org