This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 23c36b9621a [timeseries] Adding support for partial results in time
series engine (#17278)
23c36b9621a is described below
commit 23c36b9621a3331c3ce3fc681b12cf0b47b77ae8
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Thu Dec 18 09:30:58 2025 -0800
[timeseries] Adding support for partial results in time series engine
(#17278)
* [timeseries] Adding support for partial results in time series engine
* Fixed lint
* Removed String.format usage
* Fixed checkstyle
* Addressed comments
* Handled negative polling time
---------
Co-authored-by: shauryachats <[email protected]>
---
.../response/mapper/TimeSeriesResponseMapper.java | 3 ++
.../TimeSeriesExchangeReceiveOperator.java | 49 ++++++++++++++++++----
.../pinot/tsdb/spi/series/TimeSeriesBlock.java | 19 ++++++++-
3 files changed, 62 insertions(+), 9 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index d412c8b8263..be681bc50b3 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -60,6 +60,9 @@ public class TimeSeriesResponseMapper {
ResultTable resultTable = new ResultTable(dataSchema, rows);
brokerResponse.setResultTable(resultTable);
+ for (QueryException exception : timeSeriesBlock.getExceptions()) {
+
brokerResponse.addException(QueryProcessingException.fromQueryException(exception));
+ }
setStats(brokerResponse, timeSeriesBlock.getMetadata());
return brokerResponse;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index cf2e06870bc..cb65215c374 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -29,6 +29,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -108,6 +110,9 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
@VisibleForTesting
protected Object poll(long remainingTimeMs)
throws InterruptedException {
+ if (remainingTimeMs <= 0) {
+ return null;
+ }
return _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
}
@@ -116,13 +121,23 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
TimeBuckets timeBuckets = null;
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
Map<String, String> aggregatedStats = new HashMap<>();
+ QueryException timeoutException = null;
+
for (int index = 0; index < _numServersQueried; index++) {
// Step-1: Poll, and ensure we received a TimeSeriesBlock.
long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
- Preconditions.checkState(remainingTimeMs > 0,
- "Timed out before polling all servers. Successfully Polled: %s of
%s", index, _numServersQueried);
Object result = poll(remainingTimeMs);
- Preconditions.checkNotNull(result, "Timed out waiting for response.
Waited: %s ms", remainingTimeMs);
+ if (result == null) {
+ // TODO: List host:port of unresponsive servers in the exception
message.
+ timeoutException = new
QueryException(QueryErrorCode.SERVER_NOT_RESPONDING,
+ "Timed out waiting for response. Waited: "
+ + remainingTimeMs
+ + " ms and successfully polled: "
+ + index
+ + " of "
+ + _numServersQueried);
+ break;
+ }
if (result instanceof Throwable) {
throw (Throwable) result;
}
@@ -164,7 +179,11 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
timeSeriesList.add(entry.getValue().build());
seriesMap.put(seriesHash, timeSeriesList);
}
- return new TimeSeriesBlock(timeBuckets, seriesMap, aggregatedStats);
+ TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets, seriesMap,
aggregatedStats);
+ if (timeoutException != null) {
+ resultBlock.addToExceptions(timeoutException);
+ }
+ return resultBlock;
}
/**
@@ -193,11 +212,21 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
TimeBuckets timeBuckets = null;
Map<String, String> aggregatedStats = new HashMap<>();
+ QueryException timeoutException = null;
for (int index = 0; index < _numServersQueried; index++) {
long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
- Preconditions.checkState(remainingTimeMs > 0, "Timed out before polling
exchange receive");
- Object result = _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
- Preconditions.checkNotNull(result, "Timed out waiting for response.
Waited: %s ms", remainingTimeMs);
+ Object result = poll(remainingTimeMs);
+ // TODO: List host:port of unresponsive servers in the exception message.
+ if (result == null) {
+ timeoutException = new
QueryException(QueryErrorCode.SERVER_NOT_RESPONDING,
+ "Timed out waiting for response. Waited: "
+ + remainingTimeMs
+ + " ms and successfully polled: "
+ + index
+ + " of "
+ + _numServersQueried);
+ break;
+ }
if (result instanceof Throwable) {
throw ((Throwable) result);
}
@@ -218,6 +247,10 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
mergeStats(aggregatedStats, blockToMerge.getMetadata());
}
Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange
receive operator");
- return new TimeSeriesBlock(timeBuckets, timeSeriesMap, aggregatedStats);
+ TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets,
timeSeriesMap, aggregatedStats);
+ if (timeoutException != null) {
+ resultBlock.addToExceptions(timeoutException);
+ }
+ return resultBlock;
}
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
index cc813ae1ead..c7e4b41a2c3 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.tsdb.spi.series;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -41,7 +43,13 @@ public class TimeSeriesBlock {
/**
* Holds optional metadata about the block (e.g., statistics).
*/
- private Map<String, String> _metadata;
+ private final Map<String, String> _metadata;
+ /**
+ * Holds exceptions encountered during processing of the block.
+ */
+ // TODO(timeseries): Exceptions are not serialized and propagated from
servers to brokers currently, need to pass
+ // all exceptions from servers to broker through this only.
+ private final List<QueryException> _exceptions;
public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long,
List<TimeSeries>> seriesMap) {
this(timeBuckets, seriesMap, Map.of());
@@ -52,6 +60,7 @@ public class TimeSeriesBlock {
_timeBuckets = timeBuckets;
_seriesMap = seriesMap;
_metadata = Collections.unmodifiableMap(metadata);
+ _exceptions = new ArrayList<>();
}
@Nullable
@@ -66,4 +75,12 @@ public class TimeSeriesBlock {
public Map<String, String> getMetadata() {
return _metadata;
}
+
+ public List<QueryException> getExceptions() {
+ return _exceptions;
+ }
+
+ public void addToExceptions(QueryException exception) {
+ _exceptions.add(exception);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]