This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 23c36b9621a [timeseries] Adding support for partial results in time 
series engine (#17278)
23c36b9621a is described below

commit 23c36b9621a3331c3ce3fc681b12cf0b47b77ae8
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Thu Dec 18 09:30:58 2025 -0800

    [timeseries] Adding support for partial results in time series engine 
(#17278)
    
    * [timeseries] Adding support for partial results in time series engine
    
    * Fixed lint
    
    * Removed String.format usage
    
    * Fixed checkstyle
    
    * Addressed comments
    
    * Handled negative polling time
    
    ---------
    
    Co-authored-by: shauryachats <[email protected]>
---
 .../response/mapper/TimeSeriesResponseMapper.java  |  3 ++
 .../TimeSeriesExchangeReceiveOperator.java         | 49 ++++++++++++++++++----
 .../pinot/tsdb/spi/series/TimeSeriesBlock.java     | 19 ++++++++-
 3 files changed, 62 insertions(+), 9 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index d412c8b8263..be681bc50b3 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -60,6 +60,9 @@ public class TimeSeriesResponseMapper {
 
     ResultTable resultTable = new ResultTable(dataSchema, rows);
     brokerResponse.setResultTable(resultTable);
+    for (QueryException exception : timeSeriesBlock.getExceptions()) {
+      
brokerResponse.addException(QueryProcessingException.fromQueryException(exception));
+    }
     setStats(brokerResponse, timeSeriesBlock.getMetadata());
     return brokerResponse;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index cf2e06870bc..cb65215c374 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -29,6 +29,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -108,6 +110,9 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
   @VisibleForTesting
   protected Object poll(long remainingTimeMs)
       throws InterruptedException {
+    if (remainingTimeMs <= 0) {
+      return null;
+    }
     return _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
   }
 
@@ -116,13 +121,23 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
     TimeBuckets timeBuckets = null;
     Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
     Map<String, String> aggregatedStats = new HashMap<>();
+    QueryException timeoutException = null;
+
     for (int index = 0; index < _numServersQueried; index++) {
       // Step-1: Poll, and ensure we received a TimeSeriesBlock.
       long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
-      Preconditions.checkState(remainingTimeMs > 0,
-          "Timed out before polling all servers. Successfully Polled: %s of 
%s", index, _numServersQueried);
       Object result = poll(remainingTimeMs);
-      Preconditions.checkNotNull(result, "Timed out waiting for response. 
Waited: %s ms", remainingTimeMs);
+      if (result == null) {
+        // TODO: List host:port of unresponsive servers in the exception 
message.
+        timeoutException = new 
QueryException(QueryErrorCode.SERVER_NOT_RESPONDING,
+          "Timed out waiting for response. Waited: "
+            + remainingTimeMs
+            + " ms and successfully polled: "
+            + index
+            + " of "
+            + _numServersQueried);
+        break;
+      }
       if (result instanceof Throwable) {
         throw (Throwable) result;
       }
@@ -164,7 +179,11 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
       timeSeriesList.add(entry.getValue().build());
       seriesMap.put(seriesHash, timeSeriesList);
     }
-    return new TimeSeriesBlock(timeBuckets, seriesMap, aggregatedStats);
+    TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets, seriesMap, 
aggregatedStats);
+    if (timeoutException != null) {
+      resultBlock.addToExceptions(timeoutException);
+    }
+    return resultBlock;
   }
 
   /**
@@ -193,11 +212,21 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
     Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
     TimeBuckets timeBuckets = null;
     Map<String, String> aggregatedStats = new HashMap<>();
+    QueryException timeoutException = null;
     for (int index = 0; index < _numServersQueried; index++) {
       long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
-      Preconditions.checkState(remainingTimeMs > 0, "Timed out before polling 
exchange receive");
-      Object result = _receiver.poll(remainingTimeMs, TimeUnit.MILLISECONDS);
-      Preconditions.checkNotNull(result, "Timed out waiting for response. 
Waited: %s ms", remainingTimeMs);
+      Object result = poll(remainingTimeMs);
+      // TODO: List host:port of unresponsive servers in the exception message.
+      if (result == null) {
+        timeoutException = new 
QueryException(QueryErrorCode.SERVER_NOT_RESPONDING,
+            "Timed out waiting for response. Waited: "
+                + remainingTimeMs
+                + " ms and successfully polled: "
+                + index
+                + " of "
+                + _numServersQueried);
+        break;
+      }
       if (result instanceof Throwable) {
         throw ((Throwable) result);
       }
@@ -218,6 +247,10 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
       mergeStats(aggregatedStats, blockToMerge.getMetadata());
     }
     Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange 
receive operator");
-    return new TimeSeriesBlock(timeBuckets, timeSeriesMap, aggregatedStats);
+    TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets, 
timeSeriesMap, aggregatedStats);
+    if (timeoutException != null) {
+      resultBlock.addToExceptions(timeoutException);
+    }
+    return resultBlock;
   }
 }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
index cc813ae1ead..c7e4b41a2c3 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.tsdb.spi.series;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 
 
@@ -41,7 +43,13 @@ public class TimeSeriesBlock {
   /**
    * Holds optional metadata about the block (e.g., statistics).
    */
-  private Map<String, String> _metadata;
+  private final Map<String, String> _metadata;
+  /**
+   * Holds exceptions encountered during processing of the block.
+   */
+  // TODO(timeseries): Exceptions are not serialized and propagated from 
servers to brokers currently, need to pass
+  // all exceptions from servers to broker through this only.
+  private final List<QueryException> _exceptions;
 
   public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, 
List<TimeSeries>> seriesMap) {
     this(timeBuckets, seriesMap, Map.of());
@@ -52,6 +60,7 @@ public class TimeSeriesBlock {
     _timeBuckets = timeBuckets;
     _seriesMap = seriesMap;
     _metadata = Collections.unmodifiableMap(metadata);
+    _exceptions = new ArrayList<>();
   }
 
   @Nullable
@@ -66,4 +75,12 @@ public class TimeSeriesBlock {
   public Map<String, String> getMetadata() {
     return _metadata;
   }
+
+  public List<QueryException> getExceptions() {
+    return _exceptions;
+  }
+
+  public void addToExceptions(QueryException exception) {
+    _exceptions.add(exception);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to