This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c3ce1cff782 [timeseries] Returning TimeSeriesBlock to API handlers to
eliminate extraneous transformation (#16641)
c3ce1cff782 is described below
commit c3ce1cff782a2ea479ef2ef6040c76cd4034998c
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Aug 29 07:39:53 2025 +0530
[timeseries] Returning TimeSeriesBlock to API handlers to eliminate
extraneous transformation (#16641)
Co-authored-by: Shaurya Chaturvedi <[email protected]>
---
.../broker/api/resources/PinotClientRequest.java | 22 +--
.../requesthandler/BrokerRequestHandler.java | 7 +-
.../BrokerRequestHandlerDelegate.java | 9 +-
.../requesthandler/TimeSeriesRequestHandler.java | 35 ++---
.../response/PinotBrokerTimeSeriesResponse.java | 64 +--------
.../response/mapper/TimeSeriesResponseMapper.java | 126 +++++++++++++++++
.../mapper/TimeSeriesResponseMapperTest.java | 157 +++++++++++++++++++++
.../query/service/dispatch/QueryDispatcher.java | 14 +-
.../apache/pinot/spi/exception/QueryErrorCode.java | 1 +
9 files changed, 332 insertions(+), 103 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index dc49e0aec73..ee9dfbe5097 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -68,6 +68,7 @@ import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.mapper.TimeSeriesResponseMapper;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
@@ -81,6 +82,7 @@ import
org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
@@ -90,6 +92,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
import org.glassfish.jersey.server.ManagedAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,10 +298,12 @@ public class PinotClientRequest {
String language = queryParams.get(Request.LANGUAGE);
String queryString = queryParams.get(Request.QUERY);
try (RequestScope requestContext =
Tracing.getTracer().createRequestScope()) {
- PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, queryParams,
+ TimeSeriesBlock timeSeriesBlock = executeTimeSeriesQuery(language,
queryString, queryParams,
requestContext, makeHttpIdentity(requestCtx), httpHeaders);
- asyncResponse.resume(response.toBrokerResponse());
+
asyncResponse.resume(TimeSeriesResponseMapper.toBrokerResponse(timeSeriesBlock));
}
+ } catch (QueryException e) {
+ asyncResponse.resume(TimeSeriesResponseMapper.toBrokerResponse(e));
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST timeseries
request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
@@ -320,20 +325,21 @@ public class PinotClientRequest {
try {
try (RequestScope requestContext =
Tracing.getTracer().createRequestScope()) {
String queryString = requestCtx.getQueryString();
- PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, Map.of(), requestContext,
+ TimeSeriesBlock timeSeriesBlock = executeTimeSeriesQuery(language,
queryString, Map.of(), requestContext,
makeHttpIdentity(requestCtx), httpHeaders);
+ PinotBrokerTimeSeriesResponse response =
PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(timeSeriesBlock);
if (response.getErrorType() != null &&
!response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response).build());
return;
}
asyncResponse.resume(response);
}
+ } catch (QueryException e) {
+ asyncResponse.resume(PinotBrokerTimeSeriesResponse.fromException(e));
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
- asyncResponse.resume(Response.serverError().entity(
- new PinotBrokerTimeSeriesResponse("error", null,
e.getClass().getSimpleName(), e.getMessage()))
- .build());
+ asyncResponse.resume(PinotBrokerTimeSeriesResponse.fromException(e));
}
}
@@ -564,9 +570,9 @@ public class PinotClientRequest {
}
}
- private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String
language, String queryString,
+ private TimeSeriesBlock executeTimeSeriesQuery(String language, String
queryString,
Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
- HttpHeaders httpHeaders) {
+ HttpHeaders httpHeaders) throws QueryException {
return _requestHandler.handleTimeSeriesRequest(language, queryString,
queryParams, requestContext,
requesterIdentity, httpHeaders);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index fbab6bbf8e6..71ea2d3cc9f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -29,14 +29,15 @@ import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.core.HttpHeaders;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.common.response.BrokerResponse;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@ThreadSafe
@@ -64,9 +65,9 @@ public interface BrokerRequestHandler {
/**
* Run a query and use the time-series engine.
*/
- default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
+ default TimeSeriesBlock handleTimeSeriesRequest(String lang, String
rawQueryParamString,
Map<String, String> queryParams, RequestContext requestContext,
@Nullable RequesterIdentity requesterIdentity,
- HttpHeaders httpHeaders) {
+ HttpHeaders httpHeaders) throws QueryException {
throw new UnsupportedOperationException("Handler does not support Time
Series requests");
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index 730fe380b71..9fe1936e154 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -29,15 +29,16 @@ import
org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.common.cursors.AbstractResponseStore;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.CursorResponse;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
/**
@@ -124,14 +125,14 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
}
@Override
- public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
+ public TimeSeriesBlock handleTimeSeriesRequest(String lang, String
rawQueryParamString,
Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
- HttpHeaders httpHeaders) {
+ HttpHeaders httpHeaders) throws QueryException {
if (_timeSeriesRequestHandler != null) {
return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang,
rawQueryParamString, queryParams, requestContext,
requesterIdentity, httpHeaders);
}
- return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time
series query engine not enabled.");
+ throw new QueryException(QueryErrorCode.INTERNAL, "Time series query
engine not enabled.");
}
@Override
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index b7c93bb4c1d..65e04288438 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -43,7 +43,6 @@ import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.response.BrokerResponse;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.HumanReadableDuration;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
@@ -52,12 +51,15 @@ import
org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,10 +109,10 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
}
@Override
- public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
+ public TimeSeriesBlock handleTimeSeriesRequest(String lang, String
rawQueryParamString,
Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
- HttpHeaders httpHeaders) {
- PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
+ HttpHeaders httpHeaders) throws QueryException {
+ TimeSeriesBlock timeSeriesBlock = null;
long queryStartTime = System.currentTimeMillis();
try {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
@@ -121,31 +123,30 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString, queryParams);
} catch (URISyntaxException e) {
- return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST",
"Error building RangeTimeSeriesRequest");
+ throw new QueryException(QueryErrorCode.TIMESERIES_PARSING, "Error
building RangeTimeSeriesRequest", e);
}
TimeSeriesLogicalPlanResult logicalPlanResult =
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
// If there are no buckets in the logical plan, return an empty response.
if (logicalPlanResult.getTimeBuckets().getNumBuckets() == 0) {
- return PinotBrokerTimeSeriesResponse.newEmptyResponse();
+ return new TimeSeriesBlock(logicalPlanResult.getTimeBuckets(), new
HashMap<>());
}
TimeSeriesDispatchablePlan dispatchablePlan =
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest,
requestContext, logicalPlanResult);
tableLevelAccessControlCheck(httpHeaders,
dispatchablePlan.getTableNames());
- timeSeriesResponse = _queryDispatcher.submitAndGet(requestContext,
dispatchablePlan,
- timeSeriesRequest.getTimeout().toMillis(), new HashMap<>());
- return timeSeriesResponse;
+ timeSeriesBlock =
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
+ timeSeriesRequest.getTimeout().toMillis(), new HashMap<>(),
requestContext);
+ return timeSeriesBlock;
+ } catch (Exception e) {
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
1);
+ if (e instanceof QueryException) {
+ throw (QueryException) e;
+ } else {
+ throw new QueryException(QueryErrorCode.UNKNOWN, "Error processing
time-series query", e);
+ }
} finally {
_brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS,
System.currentTimeMillis() - queryStartTime,
TimeUnit.MILLISECONDS);
- if (timeSeriesResponse == null
- ||
timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS))
{
-
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
1);
- final String errorMessage = timeSeriesResponse == null ? "null
time-series response"
- : timeSeriesResponse.getError();
- // TODO(timeseries): Remove logging for failed queries.
- LOGGER.warn("time-series query failed with error: {}", errorMessage);
- }
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 820ac01996b..3198d752c30 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -25,17 +25,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.annotations.InterfaceStability;
-import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.tsdb.spi.series.TimeSeries;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -114,59 +109,12 @@ public class PinotBrokerTimeSeriesResponse {
return convertBucketedSeriesBlock(seriesBlock);
}
- public BrokerResponse toBrokerResponse() {
- BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
- if (_errorType != null) {
- // TODO: Introduce proper error code based exception handling for
timeseries.
- brokerResponse.addException(new
QueryProcessingException(QueryErrorCode.UNKNOWN, _errorType + ": "
- + _error));
- return brokerResponse;
+ public static PinotBrokerTimeSeriesResponse fromException(Exception e) {
+ if (e instanceof QueryException) {
+ QueryException qe = (QueryException) e;
+ return newErrorResponse(qe.getErrorCode().getDefaultMessage(),
e.getMessage());
}
- DataSchema dataSchema = deriveBrokerResponseDataSchema(_data);
- ResultTable resultTable = new ResultTable(dataSchema,
deriveBrokerResponseRows(_data, dataSchema.getColumnNames()));
- brokerResponse.setResultTable(resultTable);
- return brokerResponse;
- }
-
- private List<Object[]> deriveBrokerResponseRows(Data data, String[]
columnNames) {
- List<Object[]> rows = new ArrayList<>();
- if (columnNames.length == 0) {
- return rows;
- }
- for (Value value : data.getResult()) {
- Long[] ts = Arrays.stream(value.getValues()).map(entry -> (Long)
entry[0]).toArray(Long[]::new);
- Double[] values = Arrays.stream(value.getValues())
- .map(entry -> entry[1] == null ? null :
Double.valueOf(String.valueOf(entry[1])))
- .toArray(Double[]::new);
-
- Object[] row = new Object[columnNames.length];
- int index = 0;
- for (String columnName : columnNames) {
- if ("ts".equals(columnName)) {
- row[index] = ts;
- } else if ("values".equals(columnName)) {
- row[index] = values;
- } else {
- row[index] = value.getMetric().getOrDefault(columnName, null);
- }
- index++;
- }
- rows.add(row);
- }
- return rows;
- }
-
- private DataSchema deriveBrokerResponseDataSchema(Data data) {
- List<String> columnNames = new ArrayList<>(List.of("ts", "values",
"__name__"));
- List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(List.of(
- DataSchema.ColumnDataType.LONG_ARRAY,
DataSchema.ColumnDataType.DOUBLE_ARRAY));
- if (!data.getResult().isEmpty()) {
- data.getResult().get(0).getMetric().forEach((key, value) -> {
- columnNames.add(key);
- columnTypes.add(DataSchema.ColumnDataType.STRING);
- });
- }
- return new DataSchema(columnNames.toArray(new String[0]),
columnTypes.toArray(new DataSchema.ColumnDataType[0]));
+ return newErrorResponse(e.getClass().getSimpleName(), e.getMessage());
}
private static PinotBrokerTimeSeriesResponse
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
new file mode 100644
index 00000000000..714dd264456
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.mapper;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TimeSeriesResponseMapper {
+
+ private static final String TS_COLUMN = "ts";
+ private static final String VALUES_COLUMN = "values";
+ private static final String NAME_COLUMN = "__name__";
+
+ private TimeSeriesResponseMapper() {
+ }
+
+ /**
+ * Creates a BrokerResponseNativeV2 from a TimeSeriesBlock.
+ * This method converts the time series data into a format compatible with
the broker response.
+ */
+ public static BrokerResponse toBrokerResponse(TimeSeriesBlock
timeSeriesBlock) {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ if (timeSeriesBlock == null) {
+ throw new IllegalArgumentException("timeSeriesBlock must not be null");
+ }
+ if (timeSeriesBlock.getTimeBuckets() == null) {
+ throw new UnsupportedOperationException("Non-bucketed series block not
supported yet");
+ }
+ // Convert TimeSeriesBlock to ResultTable format
+ DataSchema dataSchema =
deriveDataSchemaFromTimeSeriesBlock(timeSeriesBlock);
+ List<Object[]> rows = deriveRowsFromTimeSeriesBlock(timeSeriesBlock,
dataSchema.getColumnNames());
+
+ ResultTable resultTable = new ResultTable(dataSchema, rows);
+ brokerResponse.setResultTable(resultTable);
+ return brokerResponse;
+ }
+
+ public static BrokerResponse toBrokerResponse(QueryException e) {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ brokerResponse.addException(new QueryProcessingException(e.getErrorCode(),
e.getMessage()));
+ return brokerResponse;
+ }
+
+ private static DataSchema
deriveDataSchemaFromTimeSeriesBlock(TimeSeriesBlock timeSeriesBlock) {
+ List<String> columnNames = new ArrayList<>(List.of(TS_COLUMN,
VALUES_COLUMN, NAME_COLUMN));
+ List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(List.of(
+ DataSchema.ColumnDataType.LONG_ARRAY,
DataSchema.ColumnDataType.DOUBLE_ARRAY,
+ DataSchema.ColumnDataType.STRING));
+
+ // Add tag columns if any series exist
+ if (!timeSeriesBlock.getSeriesMap().isEmpty()) {
+ // Get the first series to determine tag columns
+ TimeSeries firstSeries =
timeSeriesBlock.getSeriesMap().values().iterator().next().get(0);
+ firstSeries.getTagKeyValuesAsMap().forEach((key, value) -> {
+ if (!columnNames.contains(key)) {
+ columnNames.add(key);
+ columnTypes.add(DataSchema.ColumnDataType.STRING);
+ }
+ });
+ }
+
+ return new DataSchema(columnNames.toArray(new String[0]),
+ columnTypes.toArray(new DataSchema.ColumnDataType[0]));
+ }
+
+ private static List<Object[]> deriveRowsFromTimeSeriesBlock(TimeSeriesBlock
timeSeriesBlock,
+ String[] columnNames) {
+ List<Object[]> rows = new ArrayList<>();
+ if (columnNames.length == 0) {
+ return rows;
+ }
+
+ Long[] timeValues = timeSeriesBlock.getTimeBuckets().getTimeBuckets();
+ for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) {
+ for (TimeSeries timeSeries : listOfTimeSeries) {
+ Object[] row = new Object[columnNames.length];
+ int index = 0;
+
+ for (String columnName : columnNames) {
+ if (TS_COLUMN.equals(columnName)) {
+ row[index] = timeValues;
+ } else if (VALUES_COLUMN.equals(columnName)) {
+ Double[] values = new Double[timeValues.length];
+ for (int i = 0; i < timeValues.length; i++) {
+ Object nullableValue = timeSeries.getDoubleValues()[i];
+ values[i] = nullableValue == null ? null :
Double.valueOf(String.valueOf(nullableValue));
+ }
+ row[index] = values;
+ } else if (NAME_COLUMN.equals(columnName)) {
+ row[index] = timeSeries.getTagsSerialized();
+ } else {
+ row[index] =
timeSeries.getTagKeyValuesAsMap().getOrDefault(columnName, null);
+ }
+ index++;
+ }
+ rows.add(row);
+ }
+ }
+ return rows;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
new file mode 100644
index 00000000000..49afff1df7d
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.mapper;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesResponseMapperTest {
+
+ @Mock TimeSeriesBlock _block;
+ @Mock TimeBuckets _timeBuckets;
+
+ @BeforeTest
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void toBrokerResponseNullBlockThrows() {
+ TimeSeriesResponseMapper.toBrokerResponse((TimeSeriesBlock) null);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void toBrokerResponseNonBucketedThrows() {
+ when(_block.getTimeBuckets()).thenReturn(null);
+ TimeSeriesResponseMapper.toBrokerResponse(_block);
+ }
+
+ @Test
+ public void toBrokerResponseWithException() {
+ BrokerResponseNativeV2 resp = (BrokerResponseNativeV2)
TimeSeriesResponseMapper.toBrokerResponse(
+ new QueryException(QueryErrorCode.INTERNAL, "time series exception"));
+ List<QueryProcessingException> exceptions = resp.getExceptions();
+ assertEquals(exceptions.size(), 1);
+ assertEquals(exceptions.get(0).getErrorCode(),
QueryErrorCode.INTERNAL.getId());
+ }
+
+ @Test
+ public void toBrokerResponseWithEmptySeriesEmptyRowsBaseSchema() {
+ when(_block.getSeriesMap()).thenReturn(Collections.emptyMap());
+ when(_timeBuckets.getTimeBuckets()).thenReturn(new Long[]{100L, 200L,
300L});
+ when(_block.getTimeBuckets()).thenReturn(_timeBuckets);
+
+ BrokerResponseNativeV2 resp = (BrokerResponseNativeV2)
TimeSeriesResponseMapper.toBrokerResponse(_block);
+ ResultTable table = resp.getResultTable();
+ assertNotNull(table);
+
+ DataSchema schema = table.getDataSchema();
+ assertNotNull(schema);
+ assertEqualsNoOrder(schema.getColumnNames(), new String[]{"ts", "values",
"__name__"});
+ assertEquals(schema.getColumnDataTypes()[0],
DataSchema.ColumnDataType.LONG_ARRAY);
+ assertEquals(schema.getColumnDataTypes()[1],
DataSchema.ColumnDataType.DOUBLE_ARRAY);
+ assertEquals(schema.getColumnDataTypes()[2],
DataSchema.ColumnDataType.STRING);
+
+ assertTrue(table.getRows().isEmpty(), "Expected no rows for empty
seriesMap");
+ }
+
+ @Test
+ public void toBrokerResponseForSchemaAndTagsOnlyFromFirstSeries() {
+ when(_timeBuckets.getTimeBuckets()).thenReturn(new Long[]{10L, 20L});
+ when(_block.getTimeBuckets()).thenReturn(_timeBuckets);
+ // first series decides tag columns: region, host
+ TimeSeries s1 = new TimeSeries(
+ "id1",
+ null,
+ _timeBuckets,
+ new Double[]{1.0, 2.0},
+ Arrays.asList("region", "host"),
+ new Object[]{"us-west", "h1"}
+ );
+ // another list under same metric with different extra tag "zone"
+ TimeSeries s2 = new TimeSeries(
+ "id2",
+ null,
+ _timeBuckets,
+ new Double[]{3.0, 4.5}, // mixed types: Integer and String
+ Arrays.asList("region", "zone"),
+ new Object[]{"us-west", "z1"}
+ );
+
+ Map<Long, List<TimeSeries>> seriesMap = new LinkedHashMap<>();
+ seriesMap.put(123456789L, Arrays.asList(s1, s2));
+ when(_block.getSeriesMap()).thenReturn(seriesMap);
+
+ BrokerResponseNativeV2 resp = (BrokerResponseNativeV2)
TimeSeriesResponseMapper.toBrokerResponse(_block);
+ ResultTable table = resp.getResultTable();
+ DataSchema schema = table.getDataSchema();
+
+ // schema should include base + first-series tags (region, host), NOT
"zone"
+ List<String> schemaColumns = Arrays.asList(schema.getColumnNames());
+ assertEqualsNoOrder(schemaColumns, Arrays.asList("ts", "values",
"__name__", "region", "host"));
+ int regionIndex = schemaColumns.indexOf("region");
+ int hostIndex = schemaColumns.indexOf("host");
+
+ // verify rows: two rows, tag values present per series
+ List<Object[]> rows = table.getRows();
+ assertEquals(rows.size(), 2);
+
+ // time buckets
+ Object[] r1 = rows.get(0);
+ assertTrue(r1[0] instanceof Long[]);
+ assertEquals(((Long[]) r1[0]).length, 2);
+
+ assertEquals(r1[1], new Double[]{1.0, 2.0});
+ // __name__
+ assertEquals(r1[2], "region=us-west,host=h1");
+ // region & host
+ assertEquals(r1[regionIndex], "us-west");
+ assertEquals(r1[hostIndex], "h1");
+
+ // Row 2 (s2)
+ Object[] r2 = rows.get(1);
+ // values conversion covers Integer and String -> Double
+ assertEquals(r2[1], new Double[]{3.0, 4.5});
+ // __name__
+ assertEquals(r2[2], "region=us-west,zone=z1");
+ // region present, host missing (null), "zone" absent from schema
+ assertEquals(r2[regionIndex], "us-west");
+ assertNull(r2[hostIndex]);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index c7e4bd5c9ba..2812a7806a9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -55,7 +55,6 @@ import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -698,18 +697,7 @@ public class QueryDispatcher {
_executorService.shutdown();
}
- public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext context,
TimeSeriesDispatchablePlan plan,
- long timeoutMs, Map<String, String> queryOptions) {
- long requestId = context.getRequestId();
- try {
- TimeSeriesBlock result = submitAndGet(requestId, plan, timeoutMs,
queryOptions, context);
- return PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(result);
- } catch (Throwable t) {
- return
PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(),
t.getMessage());
- }
- }
-
- TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan
plan, long timeoutMs,
+ public TimeSeriesBlock submitAndGet(long requestId,
TimeSeriesDispatchablePlan plan, long timeoutMs,
Map<String, String> queryOptions, RequestContext requestContext)
throws Exception {
long deadlineMs = System.currentTimeMillis() + timeoutMs;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
index a92f5b1be69..592f685afe4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
@@ -32,6 +32,7 @@ public enum QueryErrorCode {
JSON_PARSING(100, "JsonParsingError"),
/// Error detected at parsing time. For example, syntax error.
SQL_PARSING(150, "SQLParsingError"),
+ TIMESERIES_PARSING(155, "TimeseriesParsingError"),
SQL_RUNTIME(160, "SQLRuntimeError"),
ACCESS_DENIED(180, "AccessDenied"),
TABLE_DOES_NOT_EXIST(190, "TableDoesNotExistError"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]