This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b7d2a6632 [timeseries] Adding support for query options (#17454)
82b7d2a6632 is described below

commit 82b7d2a663259c0819cf60501d2bcdf849ac3cef
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Jan 5 19:01:24 2026 -0800

    [timeseries] Adding support for query options (#17454)
    
    * [timeseries] Adding support for query options for timeseries engine
    
    * Fixed checkstyle
    
    * Adding integration tests
    
    * Fixed endpoint to handle queryOptions as map
    
    * Verified that non-string query options work correctly
    
    * Fixed comments
    
    ---------
    
    Co-authored-by: shauryachats <[email protected]>
---
 .../broker/api/resources/PinotClientRequest.java   | 17 ++++++++++----
 .../requesthandler/TimeSeriesRequestHandler.java   | 18 +++++++++++++--
 .../api/resources/PinotQueryResource.java          | 26 +++++++++++++++++-----
 .../pinot/integration/tests/ClusterTest.java       | 17 ++++++++++++--
 .../tests/TimeSeriesIntegrationTest.java           | 21 +++++++++++++++++
 .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java       |  1 +
 .../pinot/tsdb/spi/RangeTimeSeriesRequest.java     | 10 ++++++++-
 7 files changed, 95 insertions(+), 15 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 4e7105a8f5e..b33369a3e94 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -325,14 +325,23 @@ public class PinotClientRequest {
   @Path("query/timeseries")
   @ApiOperation(value = "Query Pinot using the Time Series Engine")
   @ManualAuthorization
-  public void processTimeSeriesQueryEngine(Map<String, String> queryParams, 
@Suspended AsyncResponse asyncResponse,
+  public void processTimeSeriesQueryEngine(JsonNode requestJson, @Suspended 
AsyncResponse asyncResponse,
       @Context org.glassfish.grizzly.http.server.Request requestCtx, @Context 
HttpHeaders httpHeaders) {
     try {
-      if (!queryParams.containsKey(Request.QUERY)) {
+      if (!requestJson.has(Request.QUERY)) {
         throw new IllegalStateException("Payload is missing the query string 
field 'query'");
       }
-      String language = queryParams.get(Request.LANGUAGE);
-      String queryString = queryParams.get(Request.QUERY);
+      String language = requestJson.has(Request.LANGUAGE) ? 
requestJson.get(Request.LANGUAGE).asText() : null;
+      String queryString = requestJson.get(Request.QUERY).asText();
+      Map<String, String> queryParams = new HashMap<>();
+      requestJson.fields().forEachRemaining(entry -> {
+          if (entry.getValue().isTextual()) {
+            queryParams.put(entry.getKey(), entry.getValue().asText());
+          } else {
+            queryParams.put(entry.getKey(), entry.getValue().toString());
+          }
+      });
+
       try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
         TimeSeriesBlock timeSeriesBlock = executeTimeSeriesQuery(language, 
queryString, queryParams,
             requestContext, makeHttpIdentity(requestCtx), httpHeaders);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 309a53dddc3..92cbb6285c2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.broker.requesthandler;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.net.URI;
@@ -62,6 +63,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -201,7 +203,7 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
     Long stepSeconds = getStepSeconds(mergedParams.get("step"));
     Duration timeout = StringUtils.isNotBlank(mergedParams.get("timeout"))
         ? HumanReadableDuration.from(mergedParams.get("timeout")) : 
Duration.ofMillis(_brokerTimeoutMs);
-
+    Map<String, String> queryOptions = 
parseQueryOptionsFromJson(mergedParams.get("queryOptions"));
     Preconditions.checkNotNull(query, "Query cannot be null");
     Preconditions.checkNotNull(startTs, "Start time cannot be null");
     Preconditions.checkNotNull(endTs, "End time cannot be null");
@@ -210,10 +212,22 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
     return new RangeTimeSeriesRequest(language, query, startTs, endTs, 
stepSeconds, timeout,
         parseIntOrDefault(mergedParams.get("limit"), 
RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT),
         parseIntOrDefault(mergedParams.get("numGroupsLimit"), 
RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT),
-        queryParamString
+        queryParamString, queryOptions
     );
   }
 
+  private Map<String, String> parseQueryOptionsFromJson(String 
queryOptionsJson) {
+    if (queryOptionsJson == null || queryOptionsJson.isEmpty()) {
+      return Map.of();
+    }
+    try {
+      return JsonUtils.stringToObject(queryOptionsJson, new TypeReference<>() 
{ });
+    } catch (Exception e) {
+      LOGGER.warn("Failed to parse queryOptions JSON: {}", queryOptionsJson, 
e);
+      return Map.of();
+    }
+  }
+
   private Long parseLongSafe(String value) {
     return value != null ? Long.parseLong(value) : null;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 2f1656b0bbc..3188693f84d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -33,6 +33,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -181,7 +182,14 @@ public class PinotQueryResource {
       String end = requestJson.has("end") ? requestJson.get("end").asText() : 
null;
       String step = requestJson.has("step") ? requestJson.get("step").asText() 
: null;
 
-      return executeTimeSeriesQueryCatching(httpHeaders, language, query, 
start, end, step, true);
+      Map<String, String> queryOptions = new HashMap<>();
+      if (requestJson.has("queryOptions") && 
requestJson.get("queryOptions").isObject()) {
+        requestJson.get("queryOptions").fields().forEachRemaining(entry -> {
+          queryOptions.put(entry.getKey(), entry.getValue().asText());
+        });
+      }
+
+      return executeTimeSeriesQueryCatching(httpHeaders, language, query, 
start, end, step, queryOptions, true);
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing POST timeseries 
request", e);
       return constructQueryExceptionResponse(QueryErrorCode.INTERNAL, 
e.getMessage());
@@ -687,16 +695,16 @@ public class PinotQueryResource {
 
   private StreamingOutput executeTimeSeriesQueryCatching(HttpHeaders 
httpHeaders, String language, String query,
     String start, String end, String step) {
-    return executeTimeSeriesQueryCatching(httpHeaders, language, query, start, 
end, step, false);
+    return executeTimeSeriesQueryCatching(httpHeaders, language, query, start, 
end, step, Map.of(), false);
   }
 
   private StreamingOutput executeTimeSeriesQueryCatching(HttpHeaders 
httpHeaders, String language, String query,
-    String start, String end, String step, boolean useBrokerCompatibleApi) {
+    String start, String end, String step, Map<String, String> queryOptions, 
boolean useBrokerCompatibleApi) {
     try {
       LOGGER.debug("Language: {}, Query: {}, Start: {}, End: {}, Step: {}, 
UseBrokerAPI: {}",
           language, query, start, end, step, useBrokerCompatibleApi);
       String instanceId = retrieveBrokerForTimeSeriesQuery(query, language, 
start, end);
-      return sendTimeSeriesRequestToBroker(language, query, start, end, step, 
instanceId, httpHeaders,
+      return sendTimeSeriesRequestToBroker(language, query, start, end, step, 
queryOptions, instanceId, httpHeaders,
           useBrokerCompatibleApi);
     } catch (QueryException ex) {
       LOGGER.warn("Caught exception while processing timeseries request {}", 
ex.getMessage());
@@ -714,7 +722,7 @@ public class PinotQueryResource {
     TimeSeriesLogicalPlanner planner = 
TimeSeriesQueryEnvironment.buildLogicalPlanner(language, _controllerConf);
     TimeSeriesLogicalPlanResult planResult = planner.plan(
         new RangeTimeSeriesRequest(language, query, Integer.parseInt(start), 
Long.parseLong(end),
-            60L, Duration.ofMinutes(1), 100, 100, ""),
+            60L, Duration.ofMinutes(1), 100, 100, "", Map.of()),
         new 
TimeSeriesTableMetadataProvider(_pinotHelixResourceManager.getTableCache()));
     String tableName = planner.getTableName(planResult);
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
@@ -724,7 +732,8 @@ public class PinotQueryResource {
 
 
   private StreamingOutput sendTimeSeriesRequestToBroker(String language, 
String query, String start, String end,
-    String step, String instanceId, HttpHeaders httpHeaders, boolean 
useBrokerCompatibleApi) {
+    String step, Map<String, String> queryOptions, String instanceId, 
HttpHeaders httpHeaders,
+    boolean useBrokerCompatibleApi) {
     InstanceConfig instanceConfig = getInstanceConfig(instanceId);
     String hostName = getHost(instanceConfig);
     String protocol = _controllerConf.getControllerBrokerProtocol();
@@ -750,6 +759,11 @@ public class PinotQueryResource {
       if (step != null && !step.isEmpty()) {
         requestJson.put("step", step);
       }
+      if (queryOptions != null && !queryOptions.isEmpty()) {
+        ObjectNode queryOptionsNode = JsonUtils.newObjectNode();
+        queryOptions.forEach(queryOptionsNode::put);
+        requestJson.set("queryOptions", queryOptionsNode);
+      }
       return sendRequestRaw(url, "POST", query, requestJson, headers);
     } else {
       // Use GET /timeseries/api/v1/query_range endpoint (Prometheus 
compatible API)
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 476003cff72..f166a05520f 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -582,9 +582,22 @@ public abstract class ClusterTest extends ControllerTest {
 
   public JsonNode postTimeseriesQuery(String baseUrl, String query, long 
startTime, long endTime,
       Map<String, String> headers) {
+    return postTimeseriesQuery(baseUrl, query, startTime, endTime, headers, 
null);
+  }
+
+  public JsonNode postTimeseriesQuery(String baseUrl, String query, long 
startTime, long endTime,
+      Map<String, String> headers, Map<String, Object> queryOptions) {
     try {
-      Map<String, String> payload = Map.of("language", "m3ql", "query", query, 
"start",
-          String.valueOf(startTime), "end", String.valueOf(endTime));
+      ObjectNode payload = JsonUtils.newObjectNode();
+      payload.put("language", "m3ql");
+      payload.put("query", query);
+      payload.put("start", String.valueOf(startTime));
+      payload.put("end", String.valueOf(endTime));
+      if (queryOptions != null && !queryOptions.isEmpty()) {
+        ObjectNode queryOptionsNode = JsonUtils.newObjectNode();
+        queryOptions.forEach(queryOptionsNode::putPOJO);
+        payload.set("queryOptions", queryOptionsNode);
+      }
       return JsonUtils.stringToJsonNode(
           sendPostRequest(baseUrl + "/query/timeseries", 
JsonUtils.objectToString(payload), headers));
     } catch (Exception e) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index d3aa6b62250..13c9d6e8437 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -239,6 +239,27 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
     assertTrue(firstException.get("message").asText().contains("Cannot apply 
JSON_MATCH on column"));
   }
 
+  @Test
+  public void testQueryOptionsNumGroupsLimit() {
+    String query = String.format(
+        
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+            + " | max{%s} | transformNull{0} | keepLastValue{}",
+        TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
+    );
+    // This query would normally return 3 groups (one for each device OS: 
windows, android, ios)
+    // With numGroupsLimit=1 query option, we expect only 1 group
+    JsonNode resultWithLimit = postTimeseriesQuery(getBrokerBaseApiUrl(), 
query, QUERY_START_TIME_SEC,
+        QUERY_END_TIME_SEC, getHeaders(), Map.of("numGroupsLimit", 1, 
"enableNullHandling", true));
+    assertNotNull(resultWithLimit);
+    assertEquals(resultWithLimit.path("numRowsResultSet").asInt(), 1,
+        "Expected only 1 group with numGroupsLimit=1 query option");
+
+    JsonNode resultTable = resultWithLimit.path("resultTable");
+    assertNotNull(resultTable);
+    JsonNode rows = resultTable.path("rows");
+    assertEquals(rows.size(), 1, "Expected only 1 row in result table with 
numGroupsLimit=1");
+  }
+
   @DataProvider(name = "isBrokerResponseCompatible")
   public Object[][] isBrokerResponseCompatible() {
     return new Object[][]{
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index 57fc6ab8bc4..9bcd37e5f9a 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -166,6 +166,7 @@ public class M3TimeSeriesPlanner implements 
TimeSeriesLogicalPlanner {
     if (request.getNumGroupsLimit() > 0) {
       queryOptions.put("numGroupsLimit", 
Integer.toString(request.getNumGroupsLimit()));
     }
+    queryOptions.putAll(request.getQueryOptions());
     return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, 
timeUnit, 0L, filter, valueExpr, aggInfo,
         groupByColumns, request.getLimit(), queryOptions);
   }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
index 7a3511dbfd8..77a9fc1493d 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.tsdb.spi;
 
 import com.google.common.base.Preconditions;
 import java.time.Duration;
+import java.util.Map;
 
 
 /**
@@ -72,9 +73,11 @@ public class RangeTimeSeriesRequest {
   private final int _numGroupsLimit;
   /** Full query string to allow language implementations to pass custom 
parameters. */
   private final String _fullQueryString;
+  /** Query options to allow language implementations to parse custom options. 
*/
+  private final Map<String, String> _queryOptions;
 
   public RangeTimeSeriesRequest(String language, String query, long 
startSeconds, long endSeconds, long stepSeconds,
-      Duration timeout, int limit, int numGroupsLimit, String fullQueryString) 
{
+      Duration timeout, int limit, int numGroupsLimit, String fullQueryString, 
Map<String, String> queryOptions) {
     Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. 
startSeconds "
         + "should be greater than or equal to endSeconds. Found 
startSeconds=%s and endSeconds=%s",
         startSeconds, endSeconds);
@@ -87,6 +90,7 @@ public class RangeTimeSeriesRequest {
     _limit = limit;
     _numGroupsLimit = numGroupsLimit;
     _fullQueryString = fullQueryString;
+    _queryOptions = queryOptions;
   }
 
   public String getLanguage() {
@@ -124,4 +128,8 @@ public class RangeTimeSeriesRequest {
   public String getFullQueryString() {
     return _fullQueryString;
   }
+
+  public Map<String, String> getQueryOptions() {
+    return _queryOptions;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to