This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f55d4f1f63 [timeseries] Introducing POST /query/timeseries as a 
BrokerResponse compatible timeseries API (#16531)
7f55d4f1f63 is described below

commit 7f55d4f1f6327dd360961b982ba0693451e2b96f
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue Aug 12 20:39:46 2025 -0700

    [timeseries] Introducing POST /query/timeseries as a BrokerResponse 
compatible timeseries API (#16531)
    
    Co-authored-by: Shaurya Chaturvedi <[email protected]>
---
 .../broker/api/resources/PinotClientRequest.java   |  36 +++++-
 .../requesthandler/BrokerRequestHandler.java       |   3 +-
 .../BrokerRequestHandlerDelegate.java              |   7 +-
 .../requesthandler/TimeSeriesRequestHandler.java   |  80 +++++-------
 .../response/PinotBrokerTimeSeriesResponse.java    |  61 ++++++++++
 .../pinot/integration/tests/ClusterTest.java       |  12 ++
 .../tests/TimeSeriesIntegrationTest.java           | 134 +++++++++++++++------
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 .../java/org/apache/pinot/spi/utils/JsonUtils.java |   5 +
 9 files changed, 250 insertions(+), 90 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 55c943cbd67..dc49e0aec73 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -280,6 +280,33 @@ public class PinotClientRequest {
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("query/timeseries")
+  @ApiOperation(value = "Query Pinot using the Time Series Engine")
+  @ManualAuthorization
+  public void processTimeSeriesQueryEngine(Map<String, String> queryParams, 
@Suspended AsyncResponse asyncResponse,
+      @Context org.glassfish.grizzly.http.server.Request requestCtx, @Context 
HttpHeaders httpHeaders) {
+    try {
+      if (!queryParams.containsKey(Request.QUERY)) {
+        throw new IllegalStateException("Payload is missing the query string 
field 'query'");
+      }
+      String language = queryParams.get(Request.LANGUAGE);
+      String queryString = queryParams.get(Request.QUERY);
+      try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
+        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, queryParams,
+            requestContext, makeHttpIdentity(requestCtx), httpHeaders);
+        asyncResponse.resume(response.toBrokerResponse());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing POST timeseries 
request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+      asyncResponse.resume(new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build()));
+    }
+  }
+
   @GET
   @ManagedAsync
   @Produces(MediaType.APPLICATION_JSON)
@@ -293,7 +320,7 @@ public class PinotClientRequest {
     try {
       try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
         String queryString = requestCtx.getQueryString();
-        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, requestContext,
+        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, Map.of(), requestContext,
           makeHttpIdentity(requestCtx), httpHeaders);
         if (response.getErrorType() != null && 
!response.getErrorType().isEmpty()) {
           
asyncResponse.resume(Response.serverError().entity(response).build());
@@ -538,9 +565,10 @@ public class PinotClientRequest {
   }
 
   private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String 
language, String queryString,
-    RequestContext requestContext, RequesterIdentity requesterIdentity, 
HttpHeaders httpHeaders) {
-    return _requestHandler.handleTimeSeriesRequest(language, queryString, 
requestContext, requesterIdentity,
-      httpHeaders);
+      Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
+      HttpHeaders httpHeaders) {
+    return _requestHandler.handleTimeSeriesRequest(language, queryString, 
queryParams, requestContext,
+        requesterIdentity, httpHeaders);
   }
 
   public static HttpRequesterIdentity 
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index cd6bdf64893..fbab6bbf8e6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -65,7 +65,8 @@ public interface BrokerRequestHandler {
    * Run a query and use the time-series engine.
    */
   default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
-      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, HttpHeaders httpHeaders) {
+      Map<String, String> queryParams, RequestContext requestContext, 
@Nullable RequesterIdentity requesterIdentity,
+      HttpHeaders httpHeaders) {
     throw new UnsupportedOperationException("Handler does not support Time 
Series requests");
   }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index e9f5499a8ee..730fe380b71 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -125,10 +125,11 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
 
   @Override
   public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
-      RequestContext requestContext, RequesterIdentity requesterIdentity, 
HttpHeaders httpHeaders) {
+      Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
+      HttpHeaders httpHeaders) {
     if (_timeSeriesRequestHandler != null) {
-      return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, 
rawQueryParamString, requestContext,
-        requesterIdentity, httpHeaders);
+      return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, 
rawQueryParamString, queryParams, requestContext,
+          requesterIdentity, httpHeaders);
     }
     return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time 
series query engine not enabled.");
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index fb38c572cf5..0195d9f3e77 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -34,7 +34,6 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
-import org.apache.http.NameValuePair;
 import org.apache.http.client.utils.URLEncodedUtils;
 import org.apache.pinot.broker.api.AccessControl;
 import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -107,7 +106,8 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
 
   @Override
   public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
-      RequestContext requestContext, RequesterIdentity requesterIdentity, 
HttpHeaders httpHeaders) {
+      Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
+      HttpHeaders httpHeaders) {
     PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
     long queryStartTime = System.currentTimeMillis();
     try {
@@ -117,7 +117,7 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
       RangeTimeSeriesRequest timeSeriesRequest = null;
       firstStageAccessControlCheck(requesterIdentity);
       try {
-        timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString);
+        timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString, queryParams);
       } catch (URISyntaxException e) {
         return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST", 
"Error building RangeTimeSeriesRequest");
       }
@@ -169,57 +169,41 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
     return false;
   }
 
-  private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, 
String queryParamString)
+  private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, 
String queryParamString,
+      Map<String, String> queryParams)
       throws URISyntaxException {
-    List<NameValuePair> pairs = URLEncodedUtils.parse(
-        new URI("http://localhost?"; + queryParamString), "UTF-8");
-    String query = null;
-    Long startTs = null;
-    Long endTs = null;
-    String step = null;
-    String timeoutStr = null;
-    int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT;
-    int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT;
-    for (NameValuePair nameValuePair : pairs) {
-      switch (nameValuePair.getName()) {
-        case "query":
-          query = nameValuePair.getValue();
-          break;
-        case "start":
-          startTs = Long.parseLong(nameValuePair.getValue());
-          break;
-        case "end":
-          endTs = Long.parseLong(nameValuePair.getValue());
-          break;
-        case "step":
-          step = nameValuePair.getValue();
-          break;
-        case "timeout":
-          timeoutStr = nameValuePair.getValue();
-          break;
-        case "limit":
-          limit = Integer.parseInt(nameValuePair.getValue());
-          break;
-        case "numGroupsLimit":
-          numGroupsLimit = Integer.parseInt(nameValuePair.getValue());
-          break;
-        default:
-          /* Okay to ignore unknown parameters since the language implementor 
may be using them. */
-          break;
-      }
+    Map<String, String> mergedParams = new HashMap<>(queryParams);
+    // If queryParams is empty, parse the queryParamString to extract 
parameters.
+    if (queryParams.isEmpty()) {
+      URLEncodedUtils.parse(new URI("http://localhost?"; + queryParamString), 
"UTF-8")
+          .forEach(pair -> mergedParams.putIfAbsent(pair.getName(), 
pair.getValue()));
     }
-    Long stepSeconds = getStepSeconds(step);
+
+    String query = mergedParams.get("query");
+    Long startTs = parseLongSafe(mergedParams.get("start"));
+    Long endTs = parseLongSafe(mergedParams.get("end"));
+    Long stepSeconds = getStepSeconds(mergedParams.get("step"));
+    Duration timeout = StringUtils.isNotBlank(mergedParams.get("timeout"))
+        ? HumanReadableDuration.from(mergedParams.get("timeout")) : 
Duration.ofMillis(_brokerTimeoutMs);
+
     Preconditions.checkNotNull(query, "Query cannot be null");
     Preconditions.checkNotNull(startTs, "Start time cannot be null");
     Preconditions.checkNotNull(endTs, "End time cannot be null");
     Preconditions.checkState(stepSeconds != null && stepSeconds > 0, "Step 
must be a positive integer");
-    Duration timeout = Duration.ofMillis(_brokerTimeoutMs);
-    if (StringUtils.isNotBlank(timeoutStr)) {
-      timeout = HumanReadableDuration.from(timeoutStr);
-    }
-    // TODO: Pass full raw query param string to the request
-    return new RangeTimeSeriesRequest(language, query, startTs, endTs, 
stepSeconds, timeout, limit, numGroupsLimit,
-        queryParamString);
+
+    return new RangeTimeSeriesRequest(language, query, startTs, endTs, 
stepSeconds, timeout,
+        parseIntOrDefault(mergedParams.get("limit"), 
RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT),
+        parseIntOrDefault(mergedParams.get("numGroupsLimit"), 
RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT),
+        queryParamString
+    );
+  }
+
+  private Long parseLongSafe(String value) {
+    return value != null ? Long.parseLong(value) : null;
+  }
+
+  private int parseIntOrDefault(String value, int defaultValue) {
+    return value != null ? Integer.parseInt(value) : defaultValue;
   }
 
   public static Long getStepSeconds(@Nullable String step) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index ce48337f727..820ac01996b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -25,11 +25,17 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
@@ -108,6 +114,61 @@ public class PinotBrokerTimeSeriesResponse {
     return convertBucketedSeriesBlock(seriesBlock);
   }
 
+  public BrokerResponse toBrokerResponse() {
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+    if (_errorType != null) {
+      // TODO: Introduce proper error code based exception handling for 
timeseries.
+      brokerResponse.addException(new 
QueryProcessingException(QueryErrorCode.UNKNOWN, _errorType + ": "
+          + _error));
+      return brokerResponse;
+    }
+    DataSchema dataSchema = deriveBrokerResponseDataSchema(_data);
+    ResultTable resultTable = new ResultTable(dataSchema, 
deriveBrokerResponseRows(_data, dataSchema.getColumnNames()));
+    brokerResponse.setResultTable(resultTable);
+    return brokerResponse;
+  }
+
+  private List<Object[]> deriveBrokerResponseRows(Data data, String[] 
columnNames) {
+    List<Object[]> rows = new ArrayList<>();
+    if (columnNames.length == 0) {
+      return rows;
+    }
+    for (Value value : data.getResult()) {
+      Long[] ts = Arrays.stream(value.getValues()).map(entry -> (Long) 
entry[0]).toArray(Long[]::new);
+      Double[] values = Arrays.stream(value.getValues())
+          .map(entry -> entry[1] == null ? null : 
Double.valueOf(String.valueOf(entry[1])))
+          .toArray(Double[]::new);
+
+      Object[] row = new Object[columnNames.length];
+      int index = 0;
+      for (String columnName : columnNames) {
+        if ("ts".equals(columnName)) {
+          row[index] = ts;
+        } else if ("values".equals(columnName)) {
+          row[index] = values;
+        } else {
+          row[index] = value.getMetric().getOrDefault(columnName, null);
+        }
+        index++;
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  private DataSchema deriveBrokerResponseDataSchema(Data data) {
+    List<String> columnNames = new ArrayList<>(List.of("ts", "values", 
"__name__"));
+    List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(List.of(
+        DataSchema.ColumnDataType.LONG_ARRAY, 
DataSchema.ColumnDataType.DOUBLE_ARRAY));
+    if (!data.getResult().isEmpty()) {
+      data.getResult().get(0).getMetric().forEach((key, value) -> {
+        columnNames.add(key);
+        columnTypes.add(DataSchema.ColumnDataType.STRING);
+      });
+    }
+    return new DataSchema(columnNames.toArray(new String[0]), 
columnTypes.toArray(new DataSchema.ColumnDataType[0]));
+  }
+
   private static PinotBrokerTimeSeriesResponse 
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
     Long[] timeValues = 
Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets();
     List<PinotBrokerTimeSeriesResponse.Value> result = new ArrayList<>();
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 70e131be55d..476003cff72 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -580,6 +580,18 @@ public abstract class ClusterTest extends ControllerTest {
     }
   }
 
+  public JsonNode postTimeseriesQuery(String baseUrl, String query, long 
startTime, long endTime,
+      Map<String, String> headers) {
+    try {
+      Map<String, String> payload = Map.of("language", "m3ql", "query", query, 
"start",
+          String.valueOf(startTime), "end", String.valueOf(endTime));
+      return JsonUtils.stringToJsonNode(
+          sendPostRequest(baseUrl + "/query/timeseries", 
JsonUtils.objectToString(payload), headers));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to post timeseries query: " + query, 
e);
+    }
+  }
+
   /**
    * Queries the broker's query endpoint (/query/sql)
    */
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index d7490418e16..c472474a845 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -19,9 +19,13 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -32,6 +36,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
@@ -41,9 +46,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
 public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
 
@@ -62,39 +69,39 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
   private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; 
// 1 minute before start time
   private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 
5 minutes after start time
 
-  @Test
-  public void testGroupByMax() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testGroupByMax(boolean isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
         + " | max{%s} | transformNull{0} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+    runGroupedTimeSeriesQuery(query, 3, isBrokerResponseCompatible, (ts, val, 
row) ->
       assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
     );
   }
 
-  @Test
-  public void testGroupByMin() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testGroupByMin(boolean isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
         + " | min{%s} | transformNull{0} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, DAYS_SINCE_FIRST_TRIP_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+    runGroupedTimeSeriesQuery(query, 5, isBrokerResponseCompatible, (ts, val, 
row) ->
       assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
     );
   }
 
-  @Test
-  public void testGroupBySum() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testGroupBySum(boolean isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
         + " | sum{%s} | transformNull{0} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, REFERRAL_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
-      String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+    runGroupedTimeSeriesQuery(query, 2, isBrokerResponseCompatible, (ts, val, 
row) -> {
+      String referral = row.get(REFERRAL_COLUMN);
       long expected = ts <= DATA_START_TIME_SEC ? 0L
         // If referral is true, views are MAX_VALUE, otherwise 20
         : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
@@ -102,65 +109,65 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
     });
   }
 
-  @Test
-  public void testGroupByTwoColumnsAndExpressionValue() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testGroupByTwoColumnsAndExpressionValue(boolean 
isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
         + " | max{%s,%s} | transformNull{0} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN, 
DAYS_SINCE_FIRST_TRIP_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+    runGroupedTimeSeriesQuery(query, 15, isBrokerResponseCompatible, (ts, val, 
row) -> {
       long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
       assertEquals(val, expected);
     });
   }
 
-  @Test
-  public void testGroupByThreeColumnsAndConstantValue() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testGroupByThreeColumnsAndConstantValue(boolean 
isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
         + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN, 
DAYS_SINCE_FIRST_TRIP_COLUMN, REFERRAL_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+    runGroupedTimeSeriesQuery(query, 30, isBrokerResponseCompatible, (ts, val, 
row) -> {
       // Since there are 30 groups, each minute will have 2 rows.
       long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
       assertEquals(val, expected);
     });
   }
 
-  @Test
-  public void testGroupByWithFilter() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testGroupByWithFilter(boolean isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
         + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
       DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN, 
DAYS_SINCE_FIRST_TRIP_COLUMN, REFERRAL_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+    runGroupedTimeSeriesQuery(query, 10, isBrokerResponseCompatible, (ts, val, 
row) ->
       assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
     );
   }
 
-  @Test
-  public void testTransformNull() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testTransformNull(boolean isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
         + " | max{%s} | transformNull{42} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+    runGroupedTimeSeriesQuery(query, 3, isBrokerResponseCompatible, (ts, val, 
row) ->
       assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
     );
   }
 
-  @Test
-  public void testTableWithoutType() {
+  @Test(dataProvider = "isBrokerResponseCompatible")
+  public void testTableWithoutType(boolean isBrokerResponseCompatible) {
     String query = String.format(
       
"fetch{table=\"mytable\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
         + " | max{%s} | transformNull{0} | keepLastValue{}",
       TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
     );
-    runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+    runGroupedTimeSeriesQuery(query, 3, isBrokerResponseCompatible, (ts, val, 
row) ->
       assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
     );
   }
@@ -198,27 +205,86 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
     assertEquals(statusCodeAndResponse.getRight(), "[\"m3ql\"]");
   }
 
+  @DataProvider(name = "isBrokerResponseCompatible")
+  public Object[][] isBrokerResponseCompatible() {
+    return new Object[][]{
+        {false}, {true}
+    };
+  }
+
   protected Map<String, String> getHeaders() {
     return Collections.emptyMap();
   }
 
-  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
TimeSeriesValidator validator) {
-    JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, 
QUERY_END_TIME_SEC, getHeaders());
-    System.out.println(result);
-    assertEquals(result.get("status").asText(), "success");
+  private void runGroupedTimeSeriesQuery(String query, int expectedGroups, 
boolean isBrokerResponseCompatible,
+      TimeSeriesValidator validator) {
+    JsonNode result = isBrokerResponseCompatible
+        ? postTimeseriesQuery(getBrokerBaseApiUrl(), query, 
QUERY_START_TIME_SEC, QUERY_END_TIME_SEC, getHeaders())
+        : getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC, 
getHeaders());
 
-    JsonNode series = result.get("data").get("result");
-    assertEquals(series.size(), expectedGroups);
+    if (isBrokerResponseCompatible) {
+      validateBrokerResponse(result, expectedGroups, validator);
+    } else {
+      validatePrometheusResponse(result, expectedGroups, validator);
+    }
+  }
+
+  private void validatePrometheusResponse(JsonNode result, int expectedGroups, 
TimeSeriesValidator validator) {
+    assertEquals("success", result.path("status").asText());
+
+    JsonNode series = result.path("data").path("result");
+    assertEquals(expectedGroups, series.size());
 
     for (JsonNode row : series) {
-      for (JsonNode point : row.get("values")) {
+      Map<String, String> metric;
+      try {
+        metric = JsonUtils.jsonNodeToStringMap(row.path("metric"));
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to parse metric from row", e);
+      }
+      for (JsonNode point : row.path("values")) {
         long ts = point.get(0).asLong();
         long val = point.get(1).asLong();
-        validator.validate(ts, val, row);
+        validator.validate(ts, val, metric);
       }
     }
   }
 
+  private void validateBrokerResponse(JsonNode result, int expectedGroups, 
TimeSeriesValidator validator) {
+    assertNotNull(result);
+    assertEquals(expectedGroups, result.path("numRowsResultSet").asInt());
+
+    JsonNode resultTable = result.path("resultTable");
+    assertNotNull(resultTable);
+
+    List<String> columnNames = 
extractStrings(resultTable.path("dataSchema").path("columnNames"));
+    JsonNode rows = resultTable.path("rows");
+
+    for (JsonNode jsonRow : rows) {
+      ArrayNode row = (ArrayNode) jsonRow;
+      assertEquals(columnNames.size(), row.size());
+
+      ArrayNode tsArray = (ArrayNode) row.get(0);
+      ArrayNode valArray = (ArrayNode) row.get(1);
+      assertEquals(tsArray.size(), valArray.size());
+
+      Map<String, String> metric = new HashMap<>();
+      for (int i = 2; i < row.size(); i++) {
+        metric.put(columnNames.get(i), row.get(i).asText());
+      }
+
+      for (int i = 0; i < tsArray.size(); i++) {
+        validator.validate(tsArray.get(i).asLong(), valArray.get(i).asLong(), 
metric);
+      }
+    }
+  }
+
+  private List<String> extractStrings(JsonNode arrayNode) {
+    List<String> result = new ArrayList<>();
+    arrayNode.forEach(node -> result.add(node.asText()));
+    return result;
+  }
+
   @Override
   protected void overrideControllerConf(Map<String, Object> properties) {
     
properties.put(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), 
"m3ql");
@@ -359,6 +425,6 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
 
   @FunctionalInterface
   interface TimeSeriesValidator {
-    void validate(long timestamp, long value, JsonNode row);
+    void validate(long timestamp, long value, Map<String, String> metricMap);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3ab38c56a03..82b18210791 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -556,6 +556,8 @@ public class CommonConstants {
       public static final String SQL_V2 = "sqlV2";
       public static final String TRACE = "trace";
       public static final String QUERY_OPTIONS = "queryOptions";
+      public static final String LANGUAGE = "language";
+      public static final String QUERY = "query";
 
       public static class QueryOptionKey {
         public static final String TIMEOUT_MS = "timeoutMs";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 6ae9d6670ee..535cccdebb2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -158,6 +158,11 @@ public class JsonUtils {
     return DEFAULT_READER.readTree(jsonString);
   }
 
+  public static Map<String, String> jsonNodeToStringMap(JsonNode jsonNode)
+      throws IOException {
+    return DEFAULT_READER.forType(MAP_TYPE_REFERENCE).readValue(jsonNode);
+  }
+
   public static JsonNode stringToJsonNodeWithBigDecimal(String jsonString)
       throws IOException {
     return READER_WITH_BIG_DECIMAL.readTree(jsonString);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to