raghavyadav01 commented on code in PR #14048:
URL: https://github.com/apache/pinot/pull/14048#discussion_r1771867719


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String 
query, @Suspended Asy
     }
   }
 
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("timeseries/api/v1/query_range")
+  @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series 
Engine")
+  @ManualAuthorization
+  public void processTimeSeriesQueryEngine(@Suspended AsyncResponse 
asyncResponse,
+      @QueryParam("language") String language,
+      @Context org.glassfish.grizzly.http.server.Request requestCtx,
+      @Context HttpHeaders httpHeaders) {
+    try {
+      try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
+        String queryString = requestCtx.getQueryString();
+        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, requestContext);
+        if (response.getErrorType() != null && 
!response.getErrorType().isEmpty()) {
+          
asyncResponse.resume(Response.serverError().entity(response).build());
+          return;
+        }
+        asyncResponse.resume(response);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing POST request", e);

Review Comment:
   Where do we do translation for http errors codes like Invalid param like 
Validation error in query planner? 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java:
##########
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
+import org.apache.pinot.common.utils.HumanReadableDuration;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesRequestHandler.class);
+  private static final long DEFAULT_STEP_SECONDS = 60L;
+  private final TimeSeriesQueryEnvironment _queryEnvironment;
+  private final QueryDispatcher _queryDispatcher;
+
+  public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, 
BrokerRoutingManager routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      QueryDispatcher queryDispatcher) {
+    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache);
+    _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, 
tableCache);
+    _queryEnvironment.init(config);
+    _queryDispatcher = queryDispatcher;
+  }
+
+  @Override
+  protected BrokerResponse handleRequest(long requestId, String query, 
@Nullable SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
+      @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
+      throws Exception {
+    throw new IllegalArgumentException("Not supported yet");
+  }
+
+  @Override
+  public void start() {
+    LOGGER.info("Starting time-series request handler");
+  }
+
+  @Override
+  public void shutDown() {
+    LOGGER.info("Shutting down time-series request handler");
+  }
+
+  @Override
+  public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
+      RequestContext requestContext) {
+    RangeTimeSeriesRequest timeSeriesRequest = null;
+    try {
+      timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    TimeSeriesLogicalPlanResult logicalPlanResult = 
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
+    TimeSeriesDispatchablePlan dispatchablePlan = 
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
+        logicalPlanResult);
+    return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, 
timeSeriesRequest.getTimeout().toMillis(),
+        new HashMap<>());
+  }
+
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    // TODO: Implement this.
+    return Map.of();
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpClientConnectionManager connMgr,
+      Map<String, Integer> serverResponses)
+      throws Exception {
+    // TODO: Implement this.
+    return false;
+  }
+
+  private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, 
String queryParamString)
+      throws URISyntaxException {
+    List<NameValuePair> pairs = URLEncodedUtils.parse(
+        new URI("http://localhost?"; + queryParamString), "UTF-8");
+    String query = null;
+    Long startTs = null;
+    Long endTs = null;
+    String step = null;
+    String timeoutStr = null;
+    for (NameValuePair nameValuePair : pairs) {
+      switch (nameValuePair.getName()) {
+        case "query":
+          query = nameValuePair.getValue();
+          break;
+        case "start":
+          startTs = Long.parseLong(nameValuePair.getValue());
+          break;
+        case "end":
+          endTs = Long.parseLong(nameValuePair.getValue());
+          break;
+        case "step":
+          step = nameValuePair.getValue();
+          break;
+        case "timeout":
+          timeoutStr = nameValuePair.getValue();
+          break;
+        default:
+          /* Okay to ignore unknown parameters since the language implementor 
may be using them. */
+          break;
+      }
+    }
+    Preconditions.checkNotNull(query, "Query cannot be null");
+    Preconditions.checkNotNull(startTs, "Start time cannot be null");
+    Preconditions.checkNotNull(endTs, "End time cannot be null");
+    Duration timeout = Duration.ofMillis(_brokerTimeoutMs);
+    if (StringUtils.isNotBlank(timeoutStr)) {
+      timeout = HumanReadableDuration.from(timeoutStr);
+    }
+    // TODO: Pass full raw query param string to the request

Review Comment:
   This is handled now correct? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -210,6 +235,69 @@ public void processQuery(WorkerMetadata workerMetadata, 
StagePlan stagePlan, Map
     _opChainScheduler.register(opChain);
   }
 
+  /**
+   * Receives a serialized plan sent by the broker, and runs it to completion, 
blocking the thread until the execution

Review Comment:
   Is there a timeout on the request in this thread or can it hang for ever? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -245,6 +323,14 @@ private DispatchClient 
getOrCreateDispatchClient(QueryServerInstance queryServer
     return _dispatchClientMap.computeIfAbsent(key, k -> new 
DispatchClient(hostname, port));
   }
 
+  private TimeSeriesDispatchClient getOrCreateTimeSeriesDispatchClient(
+      TimeSeriesQueryServerInstance queryServerInstance) {
+    String hostname = queryServerInstance.getHostname();
+    int port = queryServerInstance.getQueryServicePort();
+    String key = String.format("%s_%d", hostname, port);

Review Comment:
   Is this key generation similar to sql query dispatcher? I am wondering if 
hostname changes for pod specially in kubernetes cluster what would be the side 
effect? 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -236,6 +238,48 @@ public void processSqlWithMultiStageQueryEnginePost(String 
query, @Suspended Asy
     }
   }
 
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("timeseries/api/v1/query_range")
+  @ApiOperation(value = "Prometheus Compatible API for Pinot's Time Series 
Engine")
+  @ManualAuthorization
+  public void processTimeSeriesQueryEngine(@Suspended AsyncResponse 
asyncResponse,

Review Comment:
   I noticed these API's are not part of Swagger endpoint. Do we need to make 
additional changes to include in swagger console in pinot? 



##########
pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql.time;
+
+import java.time.Duration;
+import java.util.Collection;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+public class TimeBucketComputer {
+  private TimeBucketComputer() {
+  }
+
+  public static TimeBuckets compute(BaseTimeSeriesPlanNode planNode, 
RangeTimeSeriesRequest request) {
+    QueryTimeBoundaryConstraints constraints = process(planNode, request);
+    long newStartTime = request.getStartSeconds() - 
constraints.getLeftExtensionSeconds();

Review Comment:
   For my understanding, How are these constraints defined in M3QL? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExecutionContext.java:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+public class TimeSeriesExecutionContext {
+  private final String _language;
+  private final TimeBuckets _initialTimeBuckets;

Review Comment:
   Why initial can these TimeBuckets change when execution is happening ? 



##########
pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java:
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * POJO returned by the Pinot broker in a time-series query response. Format 
is defined
+ * <a href="https://prometheus.io/docs/prometheus/latest/querying/api/";>in the 
prometheus docs.</a>
+ * TODO: We will evolve this until Pinot 1.3. At present we are mimicking 
Prometheus HTTP API.
+ */
+@InterfaceStability.Evolving
+public class PinotBrokerTimeSeriesResponse {
+  public static final String SUCCESS_STATUS = "success";
+  public static final String ERROR_STATUS = "error";
+  public static final String METRIC_NAME_KEY = "__name__";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private String _status;
+  private Data _data;
+  private String _errorType;
+  private String _error;
+
+  static {
+    OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+  }
+
+  @JsonCreator
+  public PinotBrokerTimeSeriesResponse(@JsonProperty("status") String status, 
@JsonProperty("data") Data data,
+      @JsonProperty("errorType") String errorType, @JsonProperty("error") 
String error) {
+    _status = status;
+    _data = data;
+    _errorType = errorType;
+    _error = error;
+  }
+
+  public String getStatus() {
+    return _status;
+  }
+
+  public Data getData() {
+    return _data;
+  }
+
+  public String getErrorType() {
+    return _errorType;
+  }
+
+  public String getError() {
+    return _error;
+  }
+
+  public String serialize()
+      throws JsonProcessingException {
+    return OBJECT_MAPPER.writeValueAsString(this);
+  }
+
+  public static PinotBrokerTimeSeriesResponse newSuccessResponse(Data data) {
+    return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null);
+  }
+
+  public static PinotBrokerTimeSeriesResponse newErrorResponse(String 
errorType, String errorMessage) {
+    return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY, 
errorType, errorMessage);
+  }
+
+  public static PinotBrokerTimeSeriesResponse 
fromTimeSeriesBlock(TimeSeriesBlock seriesBlock) {
+    if (seriesBlock.getTimeBuckets() == null) {
+      throw new UnsupportedOperationException("Non-bucketed series block not 
supported yet");
+    }
+    return convertBucketedSeriesBlock(seriesBlock);
+  }
+
+  private static PinotBrokerTimeSeriesResponse 
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
+    Long[] timeValues = 
Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets();
+    List<PinotBrokerTimeSeriesResponse.Value> result = new ArrayList<>();
+    for (var listOfTimeSeries : seriesBlock.getSeriesMap().values()) {
+      Preconditions.checkState(!listOfTimeSeries.isEmpty(), "Received empty 
time-series");
+      TimeSeries anySeries = listOfTimeSeries.get(0);
+      // TODO: Ideally we should allow "aliasing" a series, and hence we 
should store something like a series-name.
+      //  Though in most contexts that would serve the same purpose as an ID.
+      Map<String, String> metricMap = new HashMap<>();
+      metricMap.put(METRIC_NAME_KEY, anySeries.getTagsSerialized());
+      metricMap.putAll(anySeries.getTagKeyValuesAsMap());
+      for (TimeSeries timeSeries : listOfTimeSeries) {
+        Object[][] values = new Object[timeValues.length][];
+        for (int i = 0; i < timeValues.length; i++) {
+          Object nullableValue = timeSeries.getValues()[i];
+          values[i] = new Object[]{timeValues[i], nullableValue == null ? null 
: nullableValue.toString()};
+        }
+        result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values));
+      }
+    }
+    PinotBrokerTimeSeriesResponse.Data data = 
PinotBrokerTimeSeriesResponse.Data.newMatrix(result);
+    return PinotBrokerTimeSeriesResponse.newSuccessResponse(data);
+  }
+
+  public static class Data {
+    public static final Data EMPTY = new Data("", new ArrayList<>());
+    private final String _resultType;
+    private final List<Value> _result;
+
+    @JsonCreator
+    public Data(@JsonProperty("resultType") String resultType, 
@JsonProperty("result") List<Value> result) {
+      _resultType = resultType;
+      _result = result;
+    }
+
+    public String getResultType() {
+      return _resultType;
+    }
+
+    public List<Value> getResult() {
+      return _result;
+    }
+
+    public static Data newMatrix(List<Value> result) {

Review Comment:
   We will need to add other types "vector", "scalar" , "string" as well for 
prometheus. 



##########
pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java:
##########
@@ -33,11 +34,12 @@ public class TimeSeriesBlock {
   private final TimeBuckets _timeBuckets;
   private final Map<Long, List<TimeSeries>> _seriesMap;
 
-  public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>> 
seriesMap) {
+  public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, 
List<TimeSeries>> seriesMap) {

Review Comment:
   Why timebucket is nullable? What will happen if TimeBucket is null? Is this 
for Instant query use case? 



##########
pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.m3ql;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.pinot.tsdb.m3ql.parser.Tokenizer;
+import org.apache.pinot.tsdb.m3ql.plan.KeepLastValuePlanNode;
+import org.apache.pinot.tsdb.m3ql.plan.TransformNullPlanNode;
+import org.apache.pinot.tsdb.m3ql.time.TimeBucketComputer;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner {
+  @Override
+  public void init(Map<String, Object> config) {
+  }
+
+  @Override
+  public TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request) {
+    if (!request.getLanguage().equals(Constants.LANGUAGE)) {
+      throw new IllegalArgumentException(String.format("Invalid engine id: %s. 
Expected: %s", request.getLanguage(),
+          Constants.LANGUAGE));
+    }
+    // Step-1: Parse and create a logical plan tree.
+    BaseTimeSeriesPlanNode planNode = planQuery(request);
+    // Step-2: Compute the time-buckets.
+    TimeBuckets timeBuckets = TimeBucketComputer.compute(planNode, request);
+    return new TimeSeriesLogicalPlanResult(planNode, timeBuckets);
+  }
+
+  public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
+    PlanIdGenerator planIdGenerator = new PlanIdGenerator();
+    Tokenizer tokenizer = new Tokenizer(request.getQuery());
+    List<List<String>> commands = tokenizer.tokenize();
+    Preconditions.checkState(commands.size() > 1, "At least two commands 
required. "
+        + "Query should start with a fetch followed by an aggregation.");
+    BaseTimeSeriesPlanNode lastNode = null;
+    AggInfo aggInfo = null;
+    List<String> groupByColumns = new ArrayList<>();
+    BaseTimeSeriesPlanNode rootNode = null;
+    for (int commandId = commands.size() - 1; commandId >= 0; commandId--) {

Review Comment:
   Do you have plan to have AST based grammar for M3QL or is this sufficient 
for M3QL? 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java:
##########
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
+import org.apache.pinot.common.utils.HumanReadableDuration;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesRequestHandler.class);
+  private static final long DEFAULT_STEP_SECONDS = 60L;
+  private final TimeSeriesQueryEnvironment _queryEnvironment;
+  private final QueryDispatcher _queryDispatcher;
+
+  public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, 
BrokerRoutingManager routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      QueryDispatcher queryDispatcher) {
+    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache);
+    _queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, 
tableCache);
+    _queryEnvironment.init(config);
+    _queryDispatcher = queryDispatcher;
+  }
+
+  @Override
+  protected BrokerResponse handleRequest(long requestId, String query, 
@Nullable SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
+      @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
+      throws Exception {
+    throw new IllegalArgumentException("Not supported yet");
+  }
+
+  @Override
+  public void start() {
+    LOGGER.info("Starting time-series request handler");
+  }
+
+  @Override
+  public void shutDown() {
+    LOGGER.info("Shutting down time-series request handler");
+  }
+
+  @Override
+  public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
+      RequestContext requestContext) {
+    RangeTimeSeriesRequest timeSeriesRequest = null;
+    try {
+      timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    TimeSeriesLogicalPlanResult logicalPlanResult = 
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
+    TimeSeriesDispatchablePlan dispatchablePlan = 
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
+        logicalPlanResult);
+    return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, 
timeSeriesRequest.getTimeout().toMillis(),
+        new HashMap<>());
+  }
+
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    // TODO: Implement this.
+    return Map.of();
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpClientConnectionManager connMgr,
+      Map<String, Integer> serverResponses)
+      throws Exception {
+    // TODO: Implement this.
+    return false;
+  }
+
+  private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, 
String queryParamString)
+      throws URISyntaxException {
+    List<NameValuePair> pairs = URLEncodedUtils.parse(
+        new URI("http://localhost?"; + queryParamString), "UTF-8");
+    String query = null;
+    Long startTs = null;
+    Long endTs = null;
+    String step = null;
+    String timeoutStr = null;
+    for (NameValuePair nameValuePair : pairs) {
+      switch (nameValuePair.getName()) {
+        case "query":
+          query = nameValuePair.getValue();
+          break;
+        case "start":
+          startTs = Long.parseLong(nameValuePair.getValue());
+          break;
+        case "end":
+          endTs = Long.parseLong(nameValuePair.getValue());
+          break;
+        case "step":
+          step = nameValuePair.getValue();
+          break;
+        case "timeout":
+          timeoutStr = nameValuePair.getValue();
+          break;
+        default:
+          /* Okay to ignore unknown parameters since the language implementor 
may be using them. */
+          break;
+      }
+    }
+    Preconditions.checkNotNull(query, "Query cannot be null");
+    Preconditions.checkNotNull(startTs, "Start time cannot be null");
+    Preconditions.checkNotNull(endTs, "End time cannot be null");
+    Duration timeout = Duration.ofMillis(_brokerTimeoutMs);
+    if (StringUtils.isNotBlank(timeoutStr)) {
+      timeout = HumanReadableDuration.from(timeoutStr);
+    }
+    // TODO: Pass full raw query param string to the request
+    return new RangeTimeSeriesRequest(language, query, startTs, endTs, 
getStepSeconds(step), timeout, queryParamString);
+  }
+
+  public static Long getStepSeconds(@Nullable String step) {
+    if (step == null) {
+      return DEFAULT_STEP_SECONDS;
+    }
+    try {
+      return Long.parseLong(step);
+    } catch (NumberFormatException ignored) {

Review Comment:
   Why are we ignoring the exception? Will we revert to default in case of 
invalid step time? 



##########
pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java:
##########
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.tsdb.planner.physical.TableScanVisitor;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
+import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TimeSeriesQueryEnvironment {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSeriesQueryEnvironment.class);
+  private final RoutingManager _routingManager;
+  private final TableCache _tableCache;
+  private final Map<String, TimeSeriesLogicalPlanner> _plannerMap = new 
HashMap<>();
+
+  public TimeSeriesQueryEnvironment(PinotConfiguration config, RoutingManager 
routingManager, TableCache tableCache) {
+    _routingManager = routingManager;
+    _tableCache = tableCache;
+  }
+
+  public void init(PinotConfiguration config) {
+    String[] languages = 
config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), 
"")
+        .split(",");
+    LOGGER.info("Found {} configured time series languages. List: {}", 
languages.length, languages);
+    for (String language : languages) {
+      String configPrefix = 
PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language);
+      String klassName =
+          
config.getProperty(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey(language));
+      Preconditions.checkNotNull(klassName, "Logical planner class not found 
for language: " + language);
+      // Create the planner with empty constructor
+      try {
+        Class<?> klass = 
TimeSeriesQueryEnvironment.class.getClassLoader().loadClass(klassName);
+        Constructor<?> constructor = klass.getConstructor();
+        TimeSeriesLogicalPlanner planner = (TimeSeriesLogicalPlanner) 
constructor.newInstance();
+        planner.init(config.subset(configPrefix).toMap());
+        _plannerMap.put(language, planner);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to instantiate logical planner for 
language: " + language, e);
+      }
+    }
+    TableScanVisitor.INSTANCE.init(_routingManager);
+  }
+
+  public TimeSeriesLogicalPlanResult buildLogicalPlan(RangeTimeSeriesRequest 
request) {
+    Preconditions.checkState(_plannerMap.containsKey(request.getLanguage()),
+        "No logical planner found for engine: %s. Available: %s", 
request.getLanguage(),
+        _plannerMap.keySet());
+    return _plannerMap.get(request.getLanguage()).plan(request);
+  }
+
+  public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest 
timeSeriesRequest,
+      RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) {
+    // Step-1: Find tables in the query.
+    final Set<String> tableNames = new HashSet<>();
+    findTableNames(logicalPlan.getPlanNode(), tableNames::add);
+    Preconditions.checkState(tableNames.size() == 1,
+        "Expected exactly one table name in the logical plan, got: %s",
+        tableNames);
+    String tableName = tableNames.iterator().next();
+    // Step-2: Compute routing table assuming all segments are selected. This 
is to perform the check to reject tables

Review Comment:
   To avoid this we will need to make sure in write path all segments for a 
table go to single server, till we implement multi server in phase2.  Can we 
capture the limitations of phase1 in task list? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to