raghavyadav01 commented on code in PR #14048: URL: https://github.com/apache/pinot/pull/14048#discussion_r1771867719
########## pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java: ########## @@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy } } + @GET + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Path("timeseries/api/v1/query_range") + @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine") + @ManualAuthorization + public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse, + @QueryParam("language") String language, + @Context org.glassfish.grizzly.http.server.Request requestCtx, + @Context HttpHeaders httpHeaders) { + try { + try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) { + String queryString = requestCtx.getQueryString(); + PinotBrokerTimeSeriesResponse response = executeTimeSeriesQuery(language, queryString, requestContext); + if (response.getErrorType() != null && !response.getErrorType().isEmpty()) { + asyncResponse.resume(Response.serverError().entity(response).build()); + return; + } + asyncResponse.resume(response); + } + } catch (Exception e) { + LOGGER.error("Caught exception while processing POST request", e); Review Comment: Where do we do translation for http errors codes like Invalid param like Validation error in query planner? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java: ########## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.api.RequesterIdentity; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; +import org.apache.pinot.common.utils.HumanReadableDuration; +import org.apache.pinot.query.service.dispatch.QueryDispatcher; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesRequestHandler.class); + private static final long DEFAULT_STEP_SECONDS = 60L; + private final TimeSeriesQueryEnvironment _queryEnvironment; + private final QueryDispatcher _queryDispatcher; + + public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + QueryDispatcher queryDispatcher) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache); + _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache); + _queryEnvironment.init(config); + _queryDispatcher = queryDispatcher; + } + + @Override + protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + throw new IllegalArgumentException("Not supported yet"); + } + + @Override + public void start() { + LOGGER.info("Starting time-series request handler"); + } + + @Override + public void shutDown() { + LOGGER.info("Shutting down time-series request handler"); + } + + @Override + public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, + RequestContext requestContext) { + RangeTimeSeriesRequest timeSeriesRequest = null; + try { + timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest); + TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext, + logicalPlanResult); + return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(), + new HashMap<>()); + } + + @Override + public Map<Long, String> getRunningQueries() { + // TODO: Implement this. + return Map.of(); + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map<String, Integer> serverResponses) + throws Exception { + // TODO: Implement this. + return false; + } + + private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString) + throws URISyntaxException { + List<NameValuePair> pairs = URLEncodedUtils.parse( + new URI("http://localhost?" + queryParamString), "UTF-8"); + String query = null; + Long startTs = null; + Long endTs = null; + String step = null; + String timeoutStr = null; + for (NameValuePair nameValuePair : pairs) { + switch (nameValuePair.getName()) { + case "query": + query = nameValuePair.getValue(); + break; + case "start": + startTs = Long.parseLong(nameValuePair.getValue()); + break; + case "end": + endTs = Long.parseLong(nameValuePair.getValue()); + break; + case "step": + step = nameValuePair.getValue(); + break; + case "timeout": + timeoutStr = nameValuePair.getValue(); + break; + default: + /* Okay to ignore unknown parameters since the language implementor may be using them. */ + break; + } + } + Preconditions.checkNotNull(query, "Query cannot be null"); + Preconditions.checkNotNull(startTs, "Start time cannot be null"); + Preconditions.checkNotNull(endTs, "End time cannot be null"); + Duration timeout = Duration.ofMillis(_brokerTimeoutMs); + if (StringUtils.isNotBlank(timeoutStr)) { + timeout = HumanReadableDuration.from(timeoutStr); + } + // TODO: Pass full raw query param string to the request Review Comment: This is handled now correct? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -210,6 +235,69 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map _opChainScheduler.register(opChain); } + /** + * Receives a serialized plan sent by the broker, and runs it to completion, blocking the thread until the execution Review Comment: Is there a timeout on the request in this thread or can it hang for ever? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java: ########## @@ -245,6 +323,14 @@ private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServer return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(hostname, port)); } + private TimeSeriesDispatchClient getOrCreateTimeSeriesDispatchClient( + TimeSeriesQueryServerInstance queryServerInstance) { + String hostname = queryServerInstance.getHostname(); + int port = queryServerInstance.getQueryServicePort(); + String key = String.format("%s_%d", hostname, port); Review Comment: Is this key generation similar to sql query dispatcher? I am wondering if hostname changes for pod specially in kubernetes cluster what would be the side effect? ########## pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java: ########## @@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy } } + @GET + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Path("timeseries/api/v1/query_range") + @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series Engine") + @ManualAuthorization + public void processTimeSeriesQueryEngine(@Suspended AsyncResponse asyncResponse, Review Comment: I noticed these API's are not part of Swagger endpoint. Do we need to make additional changes to include in swagger console in pinot? ########## pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java: ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql.time; + +import java.time.Duration; +import java.util.Collection; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +public class TimeBucketComputer { + private TimeBucketComputer() { + } + + public static TimeBuckets compute(BaseTimeSeriesPlanNode planNode, RangeTimeSeriesRequest request) { + QueryTimeBoundaryConstraints constraints = process(planNode, request); + long newStartTime = request.getStartSeconds() - constraints.getLeftExtensionSeconds(); Review Comment: For my understanding, How are these constraints defined in M3QL? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java: ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.tsdb.spi.TimeBuckets; + + +public class TimeSeriesExecutionContext { + private final String _language; + private final TimeBuckets _initialTimeBuckets; Review Comment: Why initial can these TimeBuckets change when execution is happening ? ########## pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java: ########## @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.tsdb.spi.series.TimeSeries; +import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; + + +/** + * POJO returned by the Pinot broker in a time-series query response. Format is defined + * <a href="https://prometheus.io/docs/prometheus/latest/querying/api/">in the prometheus docs.</a> + * TODO: We will evolve this until Pinot 1.3. At present we are mimicking Prometheus HTTP API. + */ +@InterfaceStability.Evolving +public class PinotBrokerTimeSeriesResponse { + public static final String SUCCESS_STATUS = "success"; + public static final String ERROR_STATUS = "error"; + public static final String METRIC_NAME_KEY = "__name__"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private String _status; + private Data _data; + private String _errorType; + private String _error; + + static { + OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + + @JsonCreator + public PinotBrokerTimeSeriesResponse(@JsonProperty("status") String status, @JsonProperty("data") Data data, + @JsonProperty("errorType") String errorType, @JsonProperty("error") String error) { + _status = status; + _data = data; + _errorType = errorType; + _error = error; + } + + public String getStatus() { + return _status; + } + + public Data getData() { + return _data; + } + + public String getErrorType() { + return _errorType; + } + + public String getError() { + return _error; + } + + public String serialize() + throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsString(this); + } + + public static PinotBrokerTimeSeriesResponse newSuccessResponse(Data data) { + return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null); + } + + public static PinotBrokerTimeSeriesResponse newErrorResponse(String errorType, String errorMessage) { + return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY, errorType, errorMessage); + } + + public static PinotBrokerTimeSeriesResponse fromTimeSeriesBlock(TimeSeriesBlock seriesBlock) { + if (seriesBlock.getTimeBuckets() == null) { + throw new UnsupportedOperationException("Non-bucketed series block not supported yet"); + } + return convertBucketedSeriesBlock(seriesBlock); + } + + private static PinotBrokerTimeSeriesResponse convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) { + Long[] timeValues = Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets(); + List<PinotBrokerTimeSeriesResponse.Value> result = new ArrayList<>(); + for (var listOfTimeSeries : seriesBlock.getSeriesMap().values()) { + Preconditions.checkState(!listOfTimeSeries.isEmpty(), "Received empty time-series"); + TimeSeries anySeries = listOfTimeSeries.get(0); + // TODO: Ideally we should allow "aliasing" a series, and hence we should store something like a series-name. + // Though in most contexts that would serve the same purpose as an ID. + Map<String, String> metricMap = new HashMap<>(); + metricMap.put(METRIC_NAME_KEY, anySeries.getTagsSerialized()); + metricMap.putAll(anySeries.getTagKeyValuesAsMap()); + for (TimeSeries timeSeries : listOfTimeSeries) { + Object[][] values = new Object[timeValues.length][]; + for (int i = 0; i < timeValues.length; i++) { + Object nullableValue = timeSeries.getValues()[i]; + values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()}; + } + result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values)); + } + } + PinotBrokerTimeSeriesResponse.Data data = PinotBrokerTimeSeriesResponse.Data.newMatrix(result); + return PinotBrokerTimeSeriesResponse.newSuccessResponse(data); + } + + public static class Data { + public static final Data EMPTY = new Data("", new ArrayList<>()); + private final String _resultType; + private final List<Value> _result; + + @JsonCreator + public Data(@JsonProperty("resultType") String resultType, @JsonProperty("result") List<Value> result) { + _resultType = resultType; + _result = result; + } + + public String getResultType() { + return _resultType; + } + + public List<Value> getResult() { + return _result; + } + + public static Data newMatrix(List<Value> result) { Review Comment: We will need to add other types "vector", "scalar" , "string" as well for prometheus. ########## pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java: ########## @@ -33,11 +34,12 @@ public class TimeSeriesBlock { private final TimeBuckets _timeBuckets; private final Map<Long, List<TimeSeries>> _seriesMap; - public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) { + public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> seriesMap) { Review Comment: Why timebucket is nullable? What will happen if TimeBucket is null? Is this for Instant query use case? ########## pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java: ########## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.m3ql; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.pinot.tsdb.m3ql.parser.Tokenizer; +import org.apache.pinot.tsdb.m3ql.plan.KeepLastValuePlanNode; +import org.apache.pinot.tsdb.m3ql.plan.TransformNullPlanNode; +import org.apache.pinot.tsdb.m3ql.time.TimeBucketComputer; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; + + +public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner { + @Override + public void init(Map<String, Object> config) { + } + + @Override + public TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request) { + if (!request.getLanguage().equals(Constants.LANGUAGE)) { + throw new IllegalArgumentException(String.format("Invalid engine id: %s. Expected: %s", request.getLanguage(), + Constants.LANGUAGE)); + } + // Step-1: Parse and create a logical plan tree. + BaseTimeSeriesPlanNode planNode = planQuery(request); + // Step-2: Compute the time-buckets. + TimeBuckets timeBuckets = TimeBucketComputer.compute(planNode, request); + return new TimeSeriesLogicalPlanResult(planNode, timeBuckets); + } + + public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { + PlanIdGenerator planIdGenerator = new PlanIdGenerator(); + Tokenizer tokenizer = new Tokenizer(request.getQuery()); + List<List<String>> commands = tokenizer.tokenize(); + Preconditions.checkState(commands.size() > 1, "At least two commands required. " + + "Query should start with a fetch followed by an aggregation."); + BaseTimeSeriesPlanNode lastNode = null; + AggInfo aggInfo = null; + List<String> groupByColumns = new ArrayList<>(); + BaseTimeSeriesPlanNode rootNode = null; + for (int commandId = commands.size() - 1; commandId >= 0; commandId--) { Review Comment: Do you have plan to have AST based grammar for M3QL or is this sufficient for M3QL? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java: ########## @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.api.RequesterIdentity; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; +import org.apache.pinot.common.utils.HumanReadableDuration; +import org.apache.pinot.query.service.dispatch.QueryDispatcher; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesRequestHandler.class); + private static final long DEFAULT_STEP_SECONDS = 60L; + private final TimeSeriesQueryEnvironment _queryEnvironment; + private final QueryDispatcher _queryDispatcher; + + public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + QueryDispatcher queryDispatcher) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache); + _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache); + _queryEnvironment.init(config); + _queryDispatcher = queryDispatcher; + } + + @Override + protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + throw new IllegalArgumentException("Not supported yet"); + } + + @Override + public void start() { + LOGGER.info("Starting time-series request handler"); + } + + @Override + public void shutDown() { + LOGGER.info("Shutting down time-series request handler"); + } + + @Override + public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, + RequestContext requestContext) { + RangeTimeSeriesRequest timeSeriesRequest = null; + try { + timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest); + TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext, + logicalPlanResult); + return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(), + new HashMap<>()); + } + + @Override + public Map<Long, String> getRunningQueries() { + // TODO: Implement this. + return Map.of(); + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map<String, Integer> serverResponses) + throws Exception { + // TODO: Implement this. + return false; + } + + private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString) + throws URISyntaxException { + List<NameValuePair> pairs = URLEncodedUtils.parse( + new URI("http://localhost?" + queryParamString), "UTF-8"); + String query = null; + Long startTs = null; + Long endTs = null; + String step = null; + String timeoutStr = null; + for (NameValuePair nameValuePair : pairs) { + switch (nameValuePair.getName()) { + case "query": + query = nameValuePair.getValue(); + break; + case "start": + startTs = Long.parseLong(nameValuePair.getValue()); + break; + case "end": + endTs = Long.parseLong(nameValuePair.getValue()); + break; + case "step": + step = nameValuePair.getValue(); + break; + case "timeout": + timeoutStr = nameValuePair.getValue(); + break; + default: + /* Okay to ignore unknown parameters since the language implementor may be using them. */ + break; + } + } + Preconditions.checkNotNull(query, "Query cannot be null"); + Preconditions.checkNotNull(startTs, "Start time cannot be null"); + Preconditions.checkNotNull(endTs, "End time cannot be null"); + Duration timeout = Duration.ofMillis(_brokerTimeoutMs); + if (StringUtils.isNotBlank(timeoutStr)) { + timeout = HumanReadableDuration.from(timeoutStr); + } + // TODO: Pass full raw query param string to the request + return new RangeTimeSeriesRequest(language, query, startTs, endTs, getStepSeconds(step), timeout, queryParamString); + } + + public static Long getStepSeconds(@Nullable String step) { + if (step == null) { + return DEFAULT_STEP_SECONDS; + } + try { + return Long.parseLong(step); + } catch (NumberFormatException ignored) { Review Comment: Why are we ignoring the exception? Will we revert to default in case of invalid step time? ########## pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.planner; + +import com.google.common.base.Preconditions; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.DataSource; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.tsdb.planner.physical.TableScanVisitor; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan; +import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; +import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; +import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode; +import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TimeSeriesQueryEnvironment { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesQueryEnvironment.class); + private final RoutingManager _routingManager; + private final TableCache _tableCache; + private final Map<String, TimeSeriesLogicalPlanner> _plannerMap = new HashMap<>(); + + public TimeSeriesQueryEnvironment(PinotConfiguration config, RoutingManager routingManager, TableCache tableCache) { + _routingManager = routingManager; + _tableCache = tableCache; + } + + public void init(PinotConfiguration config) { + String[] languages = config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "") + .split(","); + LOGGER.info("Found {} configured time series languages. List: {}", languages.length, languages); + for (String language : languages) { + String configPrefix = PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language); + String klassName = + config.getProperty(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language)); + Preconditions.checkNotNull(klassName, "Logical planner class not found for language: " + language); + // Create the planner with empty constructor + try { + Class<?> klass = TimeSeriesQueryEnvironment.class.getClassLoader().loadClass(klassName); + Constructor<?> constructor = klass.getConstructor(); + TimeSeriesLogicalPlanner planner = (TimeSeriesLogicalPlanner) constructor.newInstance(); + planner.init(config.subset(configPrefix).toMap()); + _plannerMap.put(language, planner); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate logical planner for language: " + language, e); + } + } + TableScanVisitor.INSTANCE.init(_routingManager); + } + + public TimeSeriesLogicalPlanResult buildLogicalPlan(RangeTimeSeriesRequest request) { + Preconditions.checkState(_plannerMap.containsKey(request.getLanguage()), + "No logical planner found for engine: %s. Available: %s", request.getLanguage(), + _plannerMap.keySet()); + return _plannerMap.get(request.getLanguage()).plan(request); + } + + public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest timeSeriesRequest, + RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) { + // Step-1: Find tables in the query. + final Set<String> tableNames = new HashSet<>(); + findTableNames(logicalPlan.getPlanNode(), tableNames::add); + Preconditions.checkState(tableNames.size() == 1, + "Expected exactly one table name in the logical plan, got: %s", + tableNames); + String tableName = tableNames.iterator().next(); + // Step-2: Compute routing table assuming all segments are selected. This is to perform the check to reject tables Review Comment: To avoid this we will need to make sure in write path all segments for a table go to single server, till we implement multi server in phase2. Can we capture the limitations of phase1 in task list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org