This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7f55d4f1f63 [timeseries] Introducing POST /query/timeseries as a
BrokerResponse compatible timeseries API (#16531)
7f55d4f1f63 is described below
commit 7f55d4f1f6327dd360961b982ba0693451e2b96f
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue Aug 12 20:39:46 2025 -0700
[timeseries] Introducing POST /query/timeseries as a BrokerResponse
compatible timeseries API (#16531)
Co-authored-by: Shaurya Chaturvedi <[email protected]>
---
.../broker/api/resources/PinotClientRequest.java | 36 +++++-
.../requesthandler/BrokerRequestHandler.java | 3 +-
.../BrokerRequestHandlerDelegate.java | 7 +-
.../requesthandler/TimeSeriesRequestHandler.java | 80 +++++-------
.../response/PinotBrokerTimeSeriesResponse.java | 61 ++++++++++
.../pinot/integration/tests/ClusterTest.java | 12 ++
.../tests/TimeSeriesIntegrationTest.java | 134 +++++++++++++++------
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
.../java/org/apache/pinot/spi/utils/JsonUtils.java | 5 +
9 files changed, 250 insertions(+), 90 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 55c943cbd67..dc49e0aec73 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -280,6 +280,33 @@ public class PinotClientRequest {
}
}
+ @POST
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("query/timeseries")
+ @ApiOperation(value = "Query Pinot using the Time Series Engine")
+ @ManualAuthorization
+ public void processTimeSeriesQueryEngine(Map<String, String> queryParams,
@Suspended AsyncResponse asyncResponse,
+ @Context org.glassfish.grizzly.http.server.Request requestCtx, @Context
HttpHeaders httpHeaders) {
+ try {
+ if (!queryParams.containsKey(Request.QUERY)) {
+ throw new IllegalStateException("Payload is missing the query string
field 'query'");
+ }
+ String language = queryParams.get(Request.LANGUAGE);
+ String queryString = queryParams.get(Request.QUERY);
+ try (RequestScope requestContext =
Tracing.getTracer().createRequestScope()) {
+ PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, queryParams,
+ requestContext, makeHttpIdentity(requestCtx), httpHeaders);
+ asyncResponse.resume(response.toBrokerResponse());
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing POST timeseries
request", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+ asyncResponse.resume(new WebApplicationException(e,
+
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build()));
+ }
+ }
+
@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@@ -293,7 +320,7 @@ public class PinotClientRequest {
try {
try (RequestScope requestContext =
Tracing.getTracer().createRequestScope()) {
String queryString = requestCtx.getQueryString();
- PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, requestContext,
+ PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, Map.of(), requestContext,
makeHttpIdentity(requestCtx), httpHeaders);
if (response.getErrorType() != null &&
!response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response).build());
@@ -538,9 +565,10 @@ public class PinotClientRequest {
}
private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String
language, String queryString,
- RequestContext requestContext, RequesterIdentity requesterIdentity,
HttpHeaders httpHeaders) {
- return _requestHandler.handleTimeSeriesRequest(language, queryString,
requestContext, requesterIdentity,
- httpHeaders);
+ Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
+ HttpHeaders httpHeaders) {
+ return _requestHandler.handleTimeSeriesRequest(language, queryString,
queryParams, requestContext,
+ requesterIdentity, httpHeaders);
}
public static HttpRequesterIdentity
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index cd6bdf64893..fbab6bbf8e6 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -65,7 +65,8 @@ public interface BrokerRequestHandler {
* Run a query and use the time-series engine.
*/
default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
- RequestContext requestContext, @Nullable RequesterIdentity
requesterIdentity, HttpHeaders httpHeaders) {
+ Map<String, String> queryParams, RequestContext requestContext,
@Nullable RequesterIdentity requesterIdentity,
+ HttpHeaders httpHeaders) {
throw new UnsupportedOperationException("Handler does not support Time
Series requests");
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index e9f5499a8ee..730fe380b71 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -125,10 +125,11 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
- RequestContext requestContext, RequesterIdentity requesterIdentity,
HttpHeaders httpHeaders) {
+ Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
+ HttpHeaders httpHeaders) {
if (_timeSeriesRequestHandler != null) {
- return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang,
rawQueryParamString, requestContext,
- requesterIdentity, httpHeaders);
+ return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang,
rawQueryParamString, queryParams, requestContext,
+ requesterIdentity, httpHeaders);
}
return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time
series query engine not enabled.");
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index fb38c572cf5..0195d9f3e77 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -34,7 +34,6 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
-import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -107,7 +106,8 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
- RequestContext requestContext, RequesterIdentity requesterIdentity,
HttpHeaders httpHeaders) {
+ Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
+ HttpHeaders httpHeaders) {
PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
long queryStartTime = System.currentTimeMillis();
try {
@@ -117,7 +117,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
RangeTimeSeriesRequest timeSeriesRequest = null;
firstStageAccessControlCheck(requesterIdentity);
try {
- timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString);
+ timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString, queryParams);
} catch (URISyntaxException e) {
return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST",
"Error building RangeTimeSeriesRequest");
}
@@ -169,57 +169,41 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
return false;
}
- private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language,
String queryParamString)
+ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language,
String queryParamString,
+ Map<String, String> queryParams)
throws URISyntaxException {
- List<NameValuePair> pairs = URLEncodedUtils.parse(
- new URI("http://localhost?" + queryParamString), "UTF-8");
- String query = null;
- Long startTs = null;
- Long endTs = null;
- String step = null;
- String timeoutStr = null;
- int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT;
- int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT;
- for (NameValuePair nameValuePair : pairs) {
- switch (nameValuePair.getName()) {
- case "query":
- query = nameValuePair.getValue();
- break;
- case "start":
- startTs = Long.parseLong(nameValuePair.getValue());
- break;
- case "end":
- endTs = Long.parseLong(nameValuePair.getValue());
- break;
- case "step":
- step = nameValuePair.getValue();
- break;
- case "timeout":
- timeoutStr = nameValuePair.getValue();
- break;
- case "limit":
- limit = Integer.parseInt(nameValuePair.getValue());
- break;
- case "numGroupsLimit":
- numGroupsLimit = Integer.parseInt(nameValuePair.getValue());
- break;
- default:
- /* Okay to ignore unknown parameters since the language implementor
may be using them. */
- break;
- }
+ Map<String, String> mergedParams = new HashMap<>(queryParams);
+ // If queryParams is empty, parse the queryParamString to extract
parameters.
+ if (queryParams.isEmpty()) {
+ URLEncodedUtils.parse(new URI("http://localhost?" + queryParamString),
"UTF-8")
+ .forEach(pair -> mergedParams.putIfAbsent(pair.getName(),
pair.getValue()));
}
- Long stepSeconds = getStepSeconds(step);
+
+ String query = mergedParams.get("query");
+ Long startTs = parseLongSafe(mergedParams.get("start"));
+ Long endTs = parseLongSafe(mergedParams.get("end"));
+ Long stepSeconds = getStepSeconds(mergedParams.get("step"));
+ Duration timeout = StringUtils.isNotBlank(mergedParams.get("timeout"))
+ ? HumanReadableDuration.from(mergedParams.get("timeout")) :
Duration.ofMillis(_brokerTimeoutMs);
+
Preconditions.checkNotNull(query, "Query cannot be null");
Preconditions.checkNotNull(startTs, "Start time cannot be null");
Preconditions.checkNotNull(endTs, "End time cannot be null");
Preconditions.checkState(stepSeconds != null && stepSeconds > 0, "Step
must be a positive integer");
- Duration timeout = Duration.ofMillis(_brokerTimeoutMs);
- if (StringUtils.isNotBlank(timeoutStr)) {
- timeout = HumanReadableDuration.from(timeoutStr);
- }
- // TODO: Pass full raw query param string to the request
- return new RangeTimeSeriesRequest(language, query, startTs, endTs,
stepSeconds, timeout, limit, numGroupsLimit,
- queryParamString);
+
+ return new RangeTimeSeriesRequest(language, query, startTs, endTs,
stepSeconds, timeout,
+ parseIntOrDefault(mergedParams.get("limit"),
RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT),
+ parseIntOrDefault(mergedParams.get("numGroupsLimit"),
RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT),
+ queryParamString
+ );
+ }
+
+ private Long parseLongSafe(String value) {
+ return value != null ? Long.parseLong(value) : null;
+ }
+
+ private int parseIntOrDefault(String value, int defaultValue) {
+ return value != null ? Integer.parseInt(value) : defaultValue;
}
public static Long getStepSeconds(@Nullable String step) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index ce48337f727..820ac01996b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -25,11 +25,17 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.tsdb.spi.series.TimeSeries;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -108,6 +114,61 @@ public class PinotBrokerTimeSeriesResponse {
return convertBucketedSeriesBlock(seriesBlock);
}
+ public BrokerResponse toBrokerResponse() {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ if (_errorType != null) {
+ // TODO: Introduce proper error code based exception handling for
timeseries.
+ brokerResponse.addException(new
QueryProcessingException(QueryErrorCode.UNKNOWN, _errorType + ": "
+ + _error));
+ return brokerResponse;
+ }
+ DataSchema dataSchema = deriveBrokerResponseDataSchema(_data);
+ ResultTable resultTable = new ResultTable(dataSchema,
deriveBrokerResponseRows(_data, dataSchema.getColumnNames()));
+ brokerResponse.setResultTable(resultTable);
+ return brokerResponse;
+ }
+
+ private List<Object[]> deriveBrokerResponseRows(Data data, String[]
columnNames) {
+ List<Object[]> rows = new ArrayList<>();
+ if (columnNames.length == 0) {
+ return rows;
+ }
+ for (Value value : data.getResult()) {
+ Long[] ts = Arrays.stream(value.getValues()).map(entry -> (Long)
entry[0]).toArray(Long[]::new);
+ Double[] values = Arrays.stream(value.getValues())
+ .map(entry -> entry[1] == null ? null :
Double.valueOf(String.valueOf(entry[1])))
+ .toArray(Double[]::new);
+
+ Object[] row = new Object[columnNames.length];
+ int index = 0;
+ for (String columnName : columnNames) {
+ if ("ts".equals(columnName)) {
+ row[index] = ts;
+ } else if ("values".equals(columnName)) {
+ row[index] = values;
+ } else {
+ row[index] = value.getMetric().getOrDefault(columnName, null);
+ }
+ index++;
+ }
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ private DataSchema deriveBrokerResponseDataSchema(Data data) {
+ List<String> columnNames = new ArrayList<>(List.of("ts", "values",
"__name__"));
+ List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(List.of(
+ DataSchema.ColumnDataType.LONG_ARRAY,
DataSchema.ColumnDataType.DOUBLE_ARRAY));
+ if (!data.getResult().isEmpty()) {
+ data.getResult().get(0).getMetric().forEach((key, value) -> {
+ columnNames.add(key);
+ columnTypes.add(DataSchema.ColumnDataType.STRING);
+ });
+ }
+ return new DataSchema(columnNames.toArray(new String[0]),
columnTypes.toArray(new DataSchema.ColumnDataType[0]));
+ }
+
private static PinotBrokerTimeSeriesResponse
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
Long[] timeValues =
Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets();
List<PinotBrokerTimeSeriesResponse.Value> result = new ArrayList<>();
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 70e131be55d..476003cff72 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -580,6 +580,18 @@ public abstract class ClusterTest extends ControllerTest {
}
}
+ public JsonNode postTimeseriesQuery(String baseUrl, String query, long
startTime, long endTime,
+ Map<String, String> headers) {
+ try {
+ Map<String, String> payload = Map.of("language", "m3ql", "query", query,
"start",
+ String.valueOf(startTime), "end", String.valueOf(endTime));
+ return JsonUtils.stringToJsonNode(
+ sendPostRequest(baseUrl + "/query/timeseries",
JsonUtils.objectToString(payload), headers));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to post timeseries query: " + query,
e);
+ }
+ }
+
/**
* Queries the broker's query endpoint (/query/sql)
*/
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index d7490418e16..c472474a845 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -19,9 +19,13 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -32,6 +36,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
@@ -41,9 +46,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
@@ -62,39 +69,39 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60;
// 1 minute before start time
private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; //
5 minutes after start time
- @Test
- public void testGroupByMax() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testGroupByMax(boolean isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+ " | max{%s} | transformNull{0} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
);
- runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+ runGroupedTimeSeriesQuery(query, 3, isBrokerResponseCompatible, (ts, val,
row) ->
assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
);
}
- @Test
- public void testGroupByMin() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testGroupByMin(boolean isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+ " | min{%s} | transformNull{0} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, DAYS_SINCE_FIRST_TRIP_COLUMN
);
- runGroupedTimeSeriesQuery(query, 5, (ts, val, row) ->
+ runGroupedTimeSeriesQuery(query, 5, isBrokerResponseCompatible, (ts, val,
row) ->
assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE)
);
}
- @Test
- public void testGroupBySum() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testGroupBySum(boolean isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+ " | sum{%s} | transformNull{0} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, REFERRAL_COLUMN
);
- runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> {
- String referral = row.get("metric").get(REFERRAL_COLUMN).asText();
+ runGroupedTimeSeriesQuery(query, 2, isBrokerResponseCompatible, (ts, val,
row) -> {
+ String referral = row.get(REFERRAL_COLUMN);
long expected = ts <= DATA_START_TIME_SEC ? 0L
// If referral is true, views are MAX_VALUE, otherwise 20
: "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE;
@@ -102,65 +109,65 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
});
}
- @Test
- public void testGroupByTwoColumnsAndExpressionValue() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testGroupByTwoColumnsAndExpressionValue(boolean
isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}"
+ " | max{%s,%s} | transformNull{0} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN,
DAYS_SINCE_FIRST_TRIP_COLUMN
);
- runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> {
+ runGroupedTimeSeriesQuery(query, 15, isBrokerResponseCompatible, (ts, val,
row) -> {
long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE;
assertEquals(val, expected);
});
}
- @Test
- public void testGroupByThreeColumnsAndConstantValue() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testGroupByThreeColumnsAndConstantValue(boolean
isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+ " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN,
DAYS_SINCE_FIRST_TRIP_COLUMN, REFERRAL_COLUMN
);
- runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> {
+ runGroupedTimeSeriesQuery(query, 30, isBrokerResponseCompatible, (ts, val,
row) -> {
// Since there are 30 groups, each minute will have 2 rows.
long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L;
assertEquals(val, expected);
});
}
- @Test
- public void testGroupByWithFilter() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testGroupByWithFilter(boolean isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}"
+ " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}",
DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN,
DAYS_SINCE_FIRST_TRIP_COLUMN, REFERRAL_COLUMN
);
- runGroupedTimeSeriesQuery(query, 10, (ts, val, row) ->
+ runGroupedTimeSeriesQuery(query, 10, isBrokerResponseCompatible, (ts, val,
row) ->
assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L)
);
}
- @Test
- public void testTransformNull() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testTransformNull(boolean isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+ " | max{%s} | transformNull{42} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
);
- runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+ runGroupedTimeSeriesQuery(query, 3, isBrokerResponseCompatible, (ts, val,
row) ->
assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE)
);
}
- @Test
- public void testTableWithoutType() {
+ @Test(dataProvider = "isBrokerResponseCompatible")
+ public void testTableWithoutType(boolean isBrokerResponseCompatible) {
String query = String.format(
"fetch{table=\"mytable\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+ " | max{%s} | transformNull{0} | keepLastValue{}",
TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
);
- runGroupedTimeSeriesQuery(query, 3, (ts, val, row) ->
+ runGroupedTimeSeriesQuery(query, 3, isBrokerResponseCompatible, (ts, val,
row) ->
assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE)
);
}
@@ -198,27 +205,86 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
assertEquals(statusCodeAndResponse.getRight(), "[\"m3ql\"]");
}
+ @DataProvider(name = "isBrokerResponseCompatible")
+ public Object[][] isBrokerResponseCompatible() {
+ return new Object[][]{
+ {false}, {true}
+ };
+ }
+
protected Map<String, String> getHeaders() {
return Collections.emptyMap();
}
- private void runGroupedTimeSeriesQuery(String query, int expectedGroups,
TimeSeriesValidator validator) {
- JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC,
QUERY_END_TIME_SEC, getHeaders());
- System.out.println(result);
- assertEquals(result.get("status").asText(), "success");
+ private void runGroupedTimeSeriesQuery(String query, int expectedGroups,
boolean isBrokerResponseCompatible,
+ TimeSeriesValidator validator) {
+ JsonNode result = isBrokerResponseCompatible
+ ? postTimeseriesQuery(getBrokerBaseApiUrl(), query,
QUERY_START_TIME_SEC, QUERY_END_TIME_SEC, getHeaders())
+ : getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC,
getHeaders());
- JsonNode series = result.get("data").get("result");
- assertEquals(series.size(), expectedGroups);
+ if (isBrokerResponseCompatible) {
+ validateBrokerResponse(result, expectedGroups, validator);
+ } else {
+ validatePrometheusResponse(result, expectedGroups, validator);
+ }
+ }
+
+ private void validatePrometheusResponse(JsonNode result, int expectedGroups,
TimeSeriesValidator validator) {
+ assertEquals("success", result.path("status").asText());
+
+ JsonNode series = result.path("data").path("result");
+ assertEquals(expectedGroups, series.size());
for (JsonNode row : series) {
- for (JsonNode point : row.get("values")) {
+ Map<String, String> metric;
+ try {
+ metric = JsonUtils.jsonNodeToStringMap(row.path("metric"));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse metric from row", e);
+ }
+ for (JsonNode point : row.path("values")) {
long ts = point.get(0).asLong();
long val = point.get(1).asLong();
- validator.validate(ts, val, row);
+ validator.validate(ts, val, metric);
}
}
}
+ private void validateBrokerResponse(JsonNode result, int expectedGroups,
TimeSeriesValidator validator) {
+ assertNotNull(result);
+ assertEquals(expectedGroups, result.path("numRowsResultSet").asInt());
+
+ JsonNode resultTable = result.path("resultTable");
+ assertNotNull(resultTable);
+
+ List<String> columnNames =
extractStrings(resultTable.path("dataSchema").path("columnNames"));
+ JsonNode rows = resultTable.path("rows");
+
+ for (JsonNode jsonRow : rows) {
+ ArrayNode row = (ArrayNode) jsonRow;
+ assertEquals(columnNames.size(), row.size());
+
+ ArrayNode tsArray = (ArrayNode) row.get(0);
+ ArrayNode valArray = (ArrayNode) row.get(1);
+ assertEquals(tsArray.size(), valArray.size());
+
+ Map<String, String> metric = new HashMap<>();
+ for (int i = 2; i < row.size(); i++) {
+ metric.put(columnNames.get(i), row.get(i).asText());
+ }
+
+ for (int i = 0; i < tsArray.size(); i++) {
+ validator.validate(tsArray.get(i).asLong(), valArray.get(i).asLong(),
metric);
+ }
+ }
+ }
+
+ private List<String> extractStrings(JsonNode arrayNode) {
+ List<String> result = new ArrayList<>();
+ arrayNode.forEach(node -> result.add(node.asText()));
+ return result;
+ }
+
@Override
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(),
"m3ql");
@@ -359,6 +425,6 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
@FunctionalInterface
interface TimeSeriesValidator {
- void validate(long timestamp, long value, JsonNode row);
+ void validate(long timestamp, long value, Map<String, String> metricMap);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3ab38c56a03..82b18210791 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -556,6 +556,8 @@ public class CommonConstants {
public static final String SQL_V2 = "sqlV2";
public static final String TRACE = "trace";
public static final String QUERY_OPTIONS = "queryOptions";
+ public static final String LANGUAGE = "language";
+ public static final String QUERY = "query";
public static class QueryOptionKey {
public static final String TIMEOUT_MS = "timeoutMs";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 6ae9d6670ee..535cccdebb2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -158,6 +158,11 @@ public class JsonUtils {
return DEFAULT_READER.readTree(jsonString);
}
+ public static Map<String, String> jsonNodeToStringMap(JsonNode jsonNode)
+ throws IOException {
+ return DEFAULT_READER.forType(MAP_TYPE_REFERENCE).readValue(jsonNode);
+ }
+
public static JsonNode stringToJsonNodeWithBigDecimal(String jsonString)
throws IOException {
return READER_WITH_BIG_DECIMAL.readTree(jsonString);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]