This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b828280882 Part-1: Pinot Timeseries Engine SPI (#13885) b828280882 is described below commit b8282808826b143cabcbf59fa408493fe9d03665 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Tue Sep 10 02:11:09 2024 +0530 Part-1: Pinot Timeseries Engine SPI (#13885) --- .gitignore | 3 +- pinot-timeseries/pinot-timeseries-spi/pom.xml | 49 ++++++++ .../java/org/apache/pinot/tsdb/spi/AggInfo.java | 44 +++++++ .../tsdb/spi/PinotTimeSeriesConfiguration.java | 57 +++++++++ .../pinot/tsdb/spi/RangeTimeSeriesRequest.java | 103 +++++++++++++++++ .../org/apache/pinot/tsdb/spi/TimeBuckets.java | 104 +++++++++++++++++ .../tsdb/spi/TimeSeriesLogicalPlanResult.java | 43 +++++++ .../pinot/tsdb/spi/TimeSeriesLogicalPlanner.java | 39 +++++++ .../tsdb/spi/operator/BaseTimeSeriesOperator.java | 65 +++++++++++ .../tsdb/spi/plan/BaseTimeSeriesPlanNode.java | 55 +++++++++ .../spi/plan/ScanFilterAndProjectPlanNode.java | 127 +++++++++++++++++++++ .../tsdb/spi/plan/serde/TimeSeriesPlanSerde.java | 95 +++++++++++++++ .../tsdb/spi/series/BaseTimeSeriesBuilder.java | 75 ++++++++++++ .../apache/pinot/tsdb/spi/series/TimeSeries.java | 113 ++++++++++++++++++ .../pinot/tsdb/spi/series/TimeSeriesBlock.java | 48 ++++++++ .../tsdb/spi/series/TimeSeriesBuilderFactory.java | 36 ++++++ .../series/TimeSeriesBuilderFactoryProvider.java | 63 ++++++++++ .../spi/series/builders/MaxTimeSeriesBuilder.java | 57 +++++++++ .../spi/series/builders/MinTimeSeriesBuilder.java | 57 +++++++++ .../series/builders/SummingTimeSeriesBuilder.java | 55 +++++++++ .../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 52 +++++++++ pinot-timeseries/pom.xml | 44 +++++++ pom.xml | 1 + 23 files changed, 1384 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ca69e90d0e..00404492f8 100644 --- a/.gitignore +++ b/.gitignore @@ -45,7 +45,8 @@ yarn-error.log* quickstart* #build symlink directory -build* +build +build/* #helm related files kubernetes/helm/**/charts/ diff --git a/pinot-timeseries/pinot-timeseries-spi/pom.xml b/pinot-timeseries/pinot-timeseries-spi/pom.xml new file mode 100644 index 0000000000..c21e9971d5 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/pom.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>pinot-timeseries-spi</artifactId> + + <properties> + <pinot.root>${basedir}/../..</pinot.root> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-spi</artifactId> + </dependency> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java new file mode 100644 index 0000000000..03d9cc8aa9 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + + +/** + * AggInfo is used to represent the aggregation function. Aggregation functions are simply stored as a string, + * since time-series languages are allowed to implement their own aggregation functions. + * TODO: We will likely be adding more parameters to this. One candidate is partial/full aggregation information or + * aggregation result type to allow for intermediate result types. + */ +public class AggInfo { + private final String _aggFunction; + + @JsonCreator + public AggInfo(@JsonProperty("aggFunction") String aggFunction) { + Preconditions.checkNotNull(aggFunction, "Received null aggFunction in AggInfo"); + _aggFunction = aggFunction; + } + + public String getAggFunction() { + return _aggFunction; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java new file mode 100644 index 0000000000..d0fac4369f --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; + + +public class PinotTimeSeriesConfiguration { + private PinotTimeSeriesConfiguration() { + } + + public static final String CONFIG_PREFIX = "pinot.timeseries"; + private static final String ENABLE_LANGUAGES_SUFFIX = ".languages"; + private static final String SERIES_BUILDER_FACTORY_SUFFIX = ".series.builder.factory"; + private static final String LOGICAL_PLANNER_CLASS_SUFFIX = ".logical.planner.class"; + + /** + * Config key that controls which time-series languages are enabled in a given Pinot cluster. + */ + public static String getEnabledLanguagesConfigKey() { + return CONFIG_PREFIX + ENABLE_LANGUAGES_SUFFIX; + } + + /** + * Returns the config key which determines the class name for the {@link TimeSeriesBuilderFactory} to be used for a + * given language. Each language can have its own {@link TimeSeriesBuilderFactory}, which allows each language to + * support custom time-series functions. + */ + public static String getSeriesBuilderFactoryConfigKey(String language) { + return CONFIG_PREFIX + "." + language + SERIES_BUILDER_FACTORY_SUFFIX; + } + + /** + * Returns config key which determines the class name for the {@link TimeSeriesLogicalPlanner} to be used for a given + * language. Pinot broker will load this logical planner on start-up dynamically. This is called for each language + * configured via {@link #getEnabledLanguagesConfigKey()}. + */ + public static String getLogicalPlannerConfigKey(String language) { + return CONFIG_PREFIX + "." + language + LOGICAL_PLANNER_CLASS_SUFFIX; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java new file mode 100644 index 0000000000..2c4fc045a4 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import com.google.common.base.Preconditions; +import java.time.Duration; + + +/** + * A time-series request received by the Pinot Broker. This is passed to the {@link TimeSeriesLogicalPlanner} so + * each query language can parse and plan the query based on their spec. + * <br/> + * <br/> + * <b>Notes:</b> + * <ul> + * <li>[start, end] are both inclusive.</li> + * <li> + * The result can contain time values outside [start, end], though we generally recommend to keep your results + * within the requested range. This decision is left to the time-series query language implementations. In some + * cases, returning data outside the requested time-range can help (e.g. for debugging purposes when you are + * computing moving 1d sum but are only looking at data for the last 12 hours). + * </li> + * <li>stepSeconds is used to define the default resolution for the query</li> + * <li> + * Some query languages allow users to change the resolution via a function, and in those cases the returned + * time-series may have a resolution different than stepSeconds + * </li> + * <li> + * The query execution may scan and process data outside of the time-range [start, end]. The actual data scanned + * and processed is defined by the {@link TimeBuckets} used by the operator. + * </li> + * </ul> + */ +public class RangeTimeSeriesRequest { + /** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */ + private final String _engine; + /** Query is the raw query sent by the caller. */ + private final String _query; + /** Start time of the time-window being queried. */ + private final long _startSeconds; + /** End time of the time-window being queried. */ + private final long _endSeconds; + /** + * <b>Optional</b> field which the caller can use to suggest the default resolution for the query. Language + * implementations can choose to skip this suggestion and choose their own resolution based on their semantics. + */ + private final long _stepSeconds; + /** E2E timeout for the query. */ + private final Duration _timeout; + + public RangeTimeSeriesRequest(String engine, String query, long startSeconds, long endSeconds, long stepSeconds, + Duration timeout) { + Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds " + + "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s", + startSeconds, endSeconds); + _engine = engine; + _query = query; + _startSeconds = startSeconds; + _endSeconds = endSeconds; + _stepSeconds = stepSeconds; + _timeout = timeout; + } + + public String getEngine() { + return _engine; + } + + public String getQuery() { + return _query; + } + + public long getStartSeconds() { + return _startSeconds; + } + + public long getEndSeconds() { + return _endSeconds; + } + + public long getStepSeconds() { + return _stepSeconds; + } + + public Duration getTimeout() { + return _timeout; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java new file mode 100644 index 0000000000..866249845e --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Objects; + + +/** + * Time buckets used for query execution. Each element (say x) in the {@link #getTimeBuckets()} array represents a + * time-range which is half open on the right side: [x, x + bucketSize.getSeconds()). Some query languages allow some + * operators to mutate the time-buckets on the fly, so it is not guaranteed that the time resolution and/or range + * will be the same across all operators. For instance, Uber's M3QL supports a "summarize 1h sum" operator which will + * change the bucket resolution to 1 hour for all subsequent operators. + */ +public class TimeBuckets { + private final Long[] _timeBuckets; + private final Duration _bucketSize; + + private TimeBuckets(Long[] timeBuckets, Duration bucketSize) { + _timeBuckets = timeBuckets; + _bucketSize = bucketSize; + } + + public Long[] getTimeBuckets() { + return _timeBuckets; + } + + public Duration getBucketSize() { + return _bucketSize; + } + + public long getStartTime() { + return _timeBuckets[0]; + } + + public long getEndTime() { + return _timeBuckets[_timeBuckets.length - 1]; + } + + public long getRangeSeconds() { + return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0]; + } + + public int getNumBuckets() { + return _timeBuckets.length; + } + + public int resolveIndex(long timeValue) { + if (_timeBuckets.length == 0) { + return -1; + } + if (timeValue < _timeBuckets[0]) { + return -1; + } + if (timeValue >= _timeBuckets[_timeBuckets.length - 1] + _bucketSize.getSeconds()) { + return -1; + } + return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds()); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TimeBuckets)) { + return false; + } + TimeBuckets other = (TimeBuckets) o; + return this.getStartTime() == other.getStartTime() && this.getEndTime() == other.getEndTime() + && this.getBucketSize().equals(other.getBucketSize()); + } + + @Override + public int hashCode() { + int result = Objects.hash(_bucketSize); + result = 31 * result + Arrays.hashCode(_timeBuckets); + return result; + } + + public static TimeBuckets ofSeconds(long startTimeSeconds, Duration bucketSize, int numElements) { + long stepSize = bucketSize.getSeconds(); + Long[] timeBuckets = new Long[numElements]; + for (int i = 0; i < numElements; i++) { + timeBuckets[i] = startTimeSeconds + i * stepSize; + } + return new TimeBuckets(timeBuckets, bucketSize); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java new file mode 100644 index 0000000000..829a4042bb --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; + + +/** + * The result of logical planning. + */ +public class TimeSeriesLogicalPlanResult { + private final BaseTimeSeriesPlanNode _planNode; + private final TimeBuckets _timeBuckets; + + public TimeSeriesLogicalPlanResult(BaseTimeSeriesPlanNode planNode, TimeBuckets timeBuckets) { + _planNode = planNode; + _timeBuckets = timeBuckets; + } + + public BaseTimeSeriesPlanNode getPlanNode() { + return _planNode; + } + + public TimeBuckets getTimeBuckets() { + return _timeBuckets; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java new file mode 100644 index 0000000000..0c7e724ca8 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi; + +import java.util.Map; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +/** + * Allows time-series query languages to implement their own logical planner. The input to this planner is a + * {@link RangeTimeSeriesRequest} and the output is a {@link TimeSeriesLogicalPlanResult}. Put simply, this abstraction + * takes in the query text and other parameters, and returns a logical plan which is a tree of + * {@link BaseTimeSeriesPlanNode}. Other than the plan-tree, the planner also returns a {@link TimeBuckets} which is + * the default TimeBuckets used by the query operators at runtime. Implementations are free to adjust them as they see + * fit. For instance, one query language might want to extend to the left or right of the time-range based on certain + * operators. Also, see {@link ScanFilterAndProjectPlanNode#getEffectiveFilter(TimeBuckets)}. + */ +public interface TimeSeriesLogicalPlanner { + void init(Map<String, Object> config); + + TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request); +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java new file mode 100644 index 0000000000..28be511c37 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.operator; + +import java.util.List; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +/** + * Every time-series operator takes in a {@link TimeSeriesBlock} and returns another {@link TimeSeriesBlock}. + * Parent operators/callers must call {@link #nextBlock()} to get the next block from the child operators, and implement + * {@link #getNextBlock()} to implement the business logic for their operator. Also see {@link BaseTimeSeriesPlanNode}. + * TODO: Add common hierarchy with other operators like Multistage and Pinot core. This will likely require us to + * define a pinot-query-spi or add/move some abstractions to pinot-spi. + */ +public abstract class BaseTimeSeriesOperator { + protected final List<BaseTimeSeriesOperator> _childOperators; + + public BaseTimeSeriesOperator(List<BaseTimeSeriesOperator> childOperators) { + _childOperators = childOperators; + } + + /** + * Called by parent time-series operators. + */ + public final TimeSeriesBlock nextBlock() { + long startTime = System.currentTimeMillis(); + try { + return getNextBlock(); + } finally { + // TODO: add stats + } + } + + public List<BaseTimeSeriesOperator> getChildOperators() { + return _childOperators; + } + + /** + * Time series query languages can implement their own business logic in their operators. + */ + public abstract TimeSeriesBlock getNextBlock(); + + /** + * Name that will show up in the explain plan. + */ + public abstract String getExplainName(); +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java new file mode 100644 index 0000000000..dd7a951752 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.plan; + +import java.util.List; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; + + +/** + * Generic plan node for time series queries. This allows each time-series query language to define their own plan + * nodes, which in turn generate the language specific {@link BaseTimeSeriesOperator}. + */ +public abstract class BaseTimeSeriesPlanNode { + protected final String _id; + protected final List<BaseTimeSeriesPlanNode> _children; + + public BaseTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode> children) { + _id = id; + _children = children; + } + + public String getId() { + return _id; + } + + public List<BaseTimeSeriesPlanNode> getChildren() { + return _children; + } + + public void addChildNode(BaseTimeSeriesPlanNode planNode) { + _children.add(planNode); + } + + public abstract String getKlass(); + + public abstract String getExplainName(); + + public abstract BaseTimeSeriesOperator run(); +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java new file mode 100644 index 0000000000..e2a0a15f27 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.plan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; +import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; + + +/** + * This would typically be the leaf node of a plan-tree generated by a time-series engine's logical planner. At runtime, + * this gets compiled to a Combine Operator. + * <b>Note:</b> You don't need to pass the time-filter to the filter expression, since Pinot will automatically compute + * the time filter based on the computed time buckets in {@link TimeSeriesLogicalPlanner}. + */ +public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode { + private static final String EXPLAIN_NAME = "SCAN_FILTER_AND_PROJECT"; + private final String _tableName; + private final String _timeColumn; + private final TimeUnit _timeUnit; + private final Long _offset; + private final String _filterExpression; + private final String _valueExpression; + private final AggInfo _aggInfo; + private final List<String> _groupByColumns; + + @JsonCreator + public ScanFilterAndProjectPlanNode( + @JsonProperty("id") String id, @JsonProperty("children") List<BaseTimeSeriesPlanNode> children, + @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") String timeColumn, + @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") Long offset, + @JsonProperty("filterExpression") String filterExpression, + @JsonProperty("valueExpression") String valueExpression, + @JsonProperty("aggInfo") AggInfo aggInfo, @JsonProperty("groupByColumns") List<String> groupByColumns) { + super(id, children); + _tableName = tableName; + _timeColumn = timeColumn; + _timeUnit = timeUnit; + // TODO: This is broken technically. Adjust offset to meet TimeUnit resolution. For now use 0 offset. + _offset = offset; + _filterExpression = filterExpression; + _valueExpression = valueExpression; + _aggInfo = aggInfo; + _groupByColumns = groupByColumns; + } + + @Override + public String getKlass() { + return ScanFilterAndProjectPlanNode.class.getName(); + } + + @Override + public String getExplainName() { + return EXPLAIN_NAME; + } + + @Override + public BaseTimeSeriesOperator run() { + throw new UnsupportedOperationException(""); + } + + public String getTableName() { + return _tableName; + } + + public String getTimeColumn() { + return _timeColumn; + } + + public TimeUnit getTimeUnit() { + return _timeUnit; + } + + public Long getOffset() { + return _offset; + } + + public String getFilterExpression() { + return _filterExpression; + } + + public String getValueExpression() { + return _valueExpression; + } + + public AggInfo getAggInfo() { + return _aggInfo; + } + + public List<String> getGroupByColumns() { + return _groupByColumns; + } + + public String getEffectiveFilter(TimeBuckets timeBuckets) { + String filter = _filterExpression == null ? "" : _filterExpression; + // TODO: This is wrong. offset should be converted to seconds before arithmetic. For now use 0 offset. + long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset)); + long endTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset)); + String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn, startTime, _timeColumn, endTime); + if (filter.strip().isEmpty()) { + return addnFilter; + } + return String.format("(%s) AND (%s)", filter, addnFilter); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java new file mode 100644 index 0000000000..e4e036e1ff --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.plan.serde; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +/** + * We have implemented a custom serialization/deserialization mechanism for time series plans. This allows users to + * use Jackson to annotate their plan nodes as shown in {@link ScanFilterAndProjectPlanNode}, which is used for + * plan serde for broker/server communication. + * TODO: There are limitations to this and we will change this soon. Issues: + * 1. Pinot TS SPI is compiled in Pinot distribution and Jackson deps get shaded usually. + * 2. The plugins have to shade the dependency in the exact same way, which is obviously error-prone and not ideal. + */ +@InterfaceStability.Evolving +public class TimeSeriesPlanSerde { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + static { + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private TimeSeriesPlanSerde() { + } + + public static String serialize(BaseTimeSeriesPlanNode planNode) { + try { + return OBJECT_MAPPER.writeValueAsString(planNode); + } catch (Exception e) { + throw new RuntimeException("Caught exception while serializing plan", e); + } + } + + public static BaseTimeSeriesPlanNode deserialize(String planString) { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(planString); + return create(jsonNode); + } catch (Exception e) { + throw new RuntimeException("Caught exception while deserializing plan", e); + } + } + + public static BaseTimeSeriesPlanNode create(JsonNode jsonNode) + throws JsonProcessingException, ClassNotFoundException { + JsonNode children = null; + if (jsonNode instanceof ObjectNode) { + // Remove children field to prevent Jackson from deserializing it. We will handle it manually. + ObjectNode objectNode = (ObjectNode) jsonNode; + if (objectNode.has("children")) { + children = objectNode.get("children"); + objectNode.remove("children"); + } + objectNode.putIfAbsent("children", OBJECT_MAPPER.createArrayNode()); + } + BaseTimeSeriesPlanNode planNode = null; + try { + String klassName = jsonNode.get("klass").asText(); + Class<BaseTimeSeriesPlanNode> klass = (Class<BaseTimeSeriesPlanNode>) Class.forName(klassName); + planNode = OBJECT_MAPPER.readValue(jsonNode.toString(), klass); + } finally { + if (planNode != null && children instanceof ArrayNode) { + ArrayNode childArray = (ArrayNode) children; + for (JsonNode childJsonNode : childArray) { + planNode.addChildNode(create(childJsonNode)); + } + } + } + return planNode; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java new file mode 100644 index 0000000000..3509e7cfcd --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +/** + * BaseSeriesBuilder allows language implementations to build their own aggregation and other time-series functions. + * Each time-series operator would typically call either of {@link #addValue} or {@link #addValueAtIndex}. When + * the operator is done, it will call {@link #build()} to allow the builder to compute the final {@link TimeSeries}. + */ +public abstract class BaseTimeSeriesBuilder { + protected final String _id; + @Nullable + protected final Long[] _timeValues; + @Nullable + protected final TimeBuckets _timeBuckets; + protected final List<String> _tagNames; + protected final Object[] _tagValues; + + public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, + List<String> tagNames, Object[] tagValues) { + _id = id; + _timeValues = timeValues; + _timeBuckets = timeBuckets; + _tagNames = tagNames; + _tagValues = tagValues; + } + + public abstract void addValueAtIndex(int timeBucketIndex, Double value); + + public void addValueAtIndex(int timeBucketIndex, String value) { + throw new IllegalStateException("This aggregation function does not support string input"); + } + + public abstract void addValue(long timeValue, Double value); + + public void mergeSeries(TimeSeries series) { + int numDataPoints = series.getValues().length; + Long[] timeValues = Objects.requireNonNull(series.getTimeValues(), + "Cannot merge series: found null timeValues"); + for (int i = 0; i < numDataPoints; i++) { + addValue(timeValues[i], series.getValues()[i]); + } + } + + public void mergeAlignedSeries(TimeSeries series) { + int numDataPoints = series.getValues().length; + for (int i = 0; i < numDataPoints; i++) { + addValueAtIndex(i, series.getValues()[i]); + } + } + + public abstract TimeSeries build(); +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java new file mode 100644 index 0000000000..5fcf70b42e --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +/** + * Logically, a time-series is a list of pairs of time and data values, where time is stored in increasing order. + * A time-series is identified using its ID, which can be retrieved using {@link #getId()}. + * A time series typically also has a set of pairs of keys and values which are called tags or labels. + * We allow a Series to store time either via {@link TimeBuckets} or via a long array as in {@link #getTimeValues()}. + * Using {@link TimeBuckets} is ideal when your queries are working on evenly spaced time ranges. The other option + * exists to support use-cases such as "Instant Vectors" in PromQL. + * <p> + * <b>Warning:</b> The time and value arrays passed to the Series are not copied, and can be modified by anyone with + * access to them. This is by design, to make it easier to re-use buffers during time-series operations. + * </p> + */ +public class TimeSeries { + private final String _id; + private final Long[] _timeValues; + private final TimeBuckets _timeBuckets; + private final Double[] _values; + private final List<String> _tagNames; + private final Object[] _tagValues; + + public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, Double[] values, + List<String> tagNames, Object[] tagValues) { + _id = id; + _timeValues = timeValues; + _timeBuckets = timeBuckets; + _values = values; + _tagNames = Collections.unmodifiableList(tagNames); + _tagValues = tagValues; + } + + public String getId() { + return _id; + } + + @Nullable + public Long[] getTimeValues() { + return _timeValues; + } + + @Nullable + public TimeBuckets getTimeBuckets() { + return _timeBuckets; + } + + public Double[] getValues() { + return _values; + } + + public List<String> getTagNames() { + return _tagNames; + } + + public Object[] getTagValues() { + return _tagValues; + } + + public Map<String, String> getTagKeyValuesAsMap() { + Map<String, String> result = new HashMap<>(); + for (int index = 0; index < _tagNames.size(); index++) { + String tagValue = _tagValues[index] == null ? "null" : _tagValues[index].toString(); + result.put(_tagNames.get(index), tagValue); + } + return result; + } + + public String getTagsSerialized() { + if (_tagNames.isEmpty()) { + return "*"; + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < _tagNames.size(); i++) { + if (i > 0) { + sb.append(","); + } + sb.append(String.format("%s=%s", _tagNames.get(i), _tagValues[i])); + } + return sb.toString(); + } + + // TODO: This can be cleaned up + public static long hash(Object[] tagNamesAndValues) { + return Objects.hash(tagNamesAndValues); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java new file mode 100644 index 0000000000..fe7fd5be42 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +/** + * Block used by time series operators. We store the series data in a map keyed by the series' ID. The value is a + * list of series, because some query languages support "union" operations which allow series with the same tags/labels + * to exist either in the query response or temporarily during execution before some n-ary series function + * is applied. + */ +public class TimeSeriesBlock { + private final TimeBuckets _timeBuckets; + private final Map<Long, List<TimeSeries>> _seriesMap; + + public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) { + _timeBuckets = timeBuckets; + _seriesMap = seriesMap; + } + + public TimeBuckets getTimeBuckets() { + return _timeBuckets; + } + + public Map<Long, List<TimeSeries>> getSeriesMap() { + return _seriesMap; + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java new file mode 100644 index 0000000000..088f9b3c85 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.util.List; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +public abstract class TimeSeriesBuilderFactory { + public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder( + AggInfo aggInfo, + String id, + TimeBuckets timeBuckets, + List<String> tagNames, + Object[] tagValues); + + public abstract void init(PinotConfiguration pinotConfiguration); +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java new file mode 100644 index 0000000000..b757579839 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; + + +/** + * Loads all series builder providers for all configured time-series query languages. + */ +public class TimeSeriesBuilderFactoryProvider { + private static final Map<String, TimeSeriesBuilderFactory> FACTORY_MAP = new HashMap<>(); + + private TimeSeriesBuilderFactoryProvider() { + } + + public static void init(PinotConfiguration pinotConfiguration) { + String[] languages = pinotConfiguration.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "") + .split(","); + for (String language : languages) { + String seriesBuilderClass = pinotConfiguration + .getProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey(language)); + try { + Object untypedSeriesBuilderFactory = Class.forName(seriesBuilderClass).getConstructor().newInstance(); + if (!(untypedSeriesBuilderFactory instanceof TimeSeriesBuilderFactory)) { + throw new RuntimeException("Series builder factory class " + seriesBuilderClass + + " does not implement SeriesBuilderFactory"); + } + TimeSeriesBuilderFactory seriesBuilderFactory = (TimeSeriesBuilderFactory) untypedSeriesBuilderFactory; + seriesBuilderFactory.init(pinotConfiguration.subset( + PinotTimeSeriesConfiguration.CONFIG_PREFIX + "." + language)); + FACTORY_MAP.put(language, seriesBuilderFactory); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public static TimeSeriesBuilderFactory getSeriesBuilderFactory(String engine) { + return Objects.requireNonNull(FACTORY_MAP.get(engine), + "No series builder factory found for engine: " + engine); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java new file mode 100644 index 0000000000..742b1b32c6 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series.builders; + +import java.util.List; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; + + +/** + * MaxSeriesBuilder is a series builder that computes the maximum value in each time bucket. + * <b>Context:</b>We provide some ready to use implementations for some of the most common use-cases in the SPI. This + * reduces redundancy and also serves as a reference implementation for language developers. + */ +public class MaxTimeSeriesBuilder extends BaseTimeSeriesBuilder { + private final Double[] _values; + + public MaxTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String> tagNames, Object[] tagValues) { + super(id, null, timeBuckets, tagNames, tagValues); + _values = new Double[timeBuckets.getNumBuckets()]; + } + + @Override + public void addValueAtIndex(int timeBucketIndex, Double value) { + if (_values[timeBucketIndex] == null || value > _values[timeBucketIndex]) { + _values[timeBucketIndex] = value; + } + } + + @Override + public void addValue(long timeValue, Double value) { + int timeBucketIndex = _timeBuckets.resolveIndex(timeValue); + addValueAtIndex(timeBucketIndex, value); + } + + @Override + public TimeSeries build() { + return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, _tagValues); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java new file mode 100644 index 0000000000..93cdab77d4 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series.builders; + +import java.util.List; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; + + +/** + * MinSeriesBuilder is a series builder that computes the minimum value in each time bucket. + * <b>Context:</b>We provide some ready to use implementations for some of the most common use-cases in the SPI. This + * reduces redundancy and also serves as a reference implementation for language developers. + */ +public class MinTimeSeriesBuilder extends BaseTimeSeriesBuilder { + private final Double[] _values; + + public MinTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String> tagNames, Object[] tagValues) { + super(id, null, timeBuckets, tagNames, tagValues); + _values = new Double[timeBuckets.getNumBuckets()]; + } + + @Override + public void addValueAtIndex(int timeBucketIndex, Double value) { + if (_values[timeBucketIndex] == null || value < _values[timeBucketIndex]) { + _values[timeBucketIndex] = value; + } + } + + @Override + public void addValue(long timeValue, Double value) { + int timeBucketIndex = _timeBuckets.resolveIndex(timeValue); + addValueAtIndex(timeBucketIndex, value); + } + + @Override + public TimeSeries build() { + return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, _tagValues); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java new file mode 100644 index 0000000000..2cf723b8e4 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series.builders; + +import java.util.List; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; +import org.apache.pinot.tsdb.spi.series.TimeSeries; + + +/** + * SummingSeriesBuilder is a series builder that computes the sum of all values in each time bucket. + * <b>Context:</b>We provide some ready to use implementations for some of the most common use-cases in the SPI. This + * reduces redundancy and also serves as a reference implementation for language developers. + */ +public class SummingTimeSeriesBuilder extends BaseTimeSeriesBuilder { + private final Double[] _values; + + public SummingTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String> tagNames, Object[] tagValues) { + super(id, null, timeBuckets, tagNames, tagValues); + _values = new Double[timeBuckets.getNumBuckets()]; + } + + @Override + public void addValueAtIndex(int timeBucketIndex, Double value) { + _values[timeBucketIndex] = (_values[timeBucketIndex] == null ? 0 : _values[timeBucketIndex]) + value; + } + + @Override + public void addValue(long timeValue, Double value) { + int timeBucketIndex = _timeBuckets.resolveIndex(timeValue); + addValueAtIndex(timeBucketIndex, value); + } + + @Override + public TimeSeries build() { + return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames, _tagValues); + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java new file mode 100644 index 0000000000..ff74b6ef35 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.plan.serde; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class TimeSeriesPlanSerdeTest { + @Test + public void testSerdeForScanFilterProjectNode() { + ScanFilterAndProjectPlanNode scanFilterAndProjectPlanNode = new ScanFilterAndProjectPlanNode( + "sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, + 0L, "myFilterExpression", "myValueExpression", + new AggInfo("SUM"), new ArrayList<>() + ); + BaseTimeSeriesPlanNode planNode = + TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(scanFilterAndProjectPlanNode)); + assertTrue(planNode instanceof ScanFilterAndProjectPlanNode); + ScanFilterAndProjectPlanNode deserializedNode = (ScanFilterAndProjectPlanNode) planNode; + assertEquals(deserializedNode.getTableName(), "myTable"); + assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn"); + assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS); + assertEquals(deserializedNode.getOffset(), 0L); + assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression"); + assertEquals(deserializedNode.getValueExpression(), "myValueExpression"); + assertNotNull(deserializedNode.getAggInfo()); + assertEquals(deserializedNode.getGroupByColumns().size(), 0); + } +} diff --git a/pinot-timeseries/pom.xml b/pinot-timeseries/pom.xml new file mode 100644 index 0000000000..f49d0cb922 --- /dev/null +++ b/pinot-timeseries/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <packaging>pom</packaging> + + <artifactId>pinot-timeseries</artifactId> + + <properties> + <pinot.root>${basedir}/..</pinot.root> + </properties> + + <modules> + <module>pinot-timeseries-spi</module> + </modules> + +</project> diff --git a/pom.xml b/pom.xml index 066b542250..3f701f474f 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ <module>pinot-compatibility-verifier</module> <module>pinot-query-planner</module> <module>pinot-query-runtime</module> + <module>pinot-timeseries</module> </modules> <licenses> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org