This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 99cde4d164 [MSE] For constant expression query, solve it with a single 
random server (#16083)
99cde4d164 is described below

commit 99cde4d16433a3ded02a5a6a6c3b230d048bc8c0
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Jun 12 11:08:48 2025 -0600

    [MSE] For constant expression query, solve it with a single random server 
(#16083)
---
 .../MultiNodesOfflineClusterIntegrationTest.java   | 12 +++
 .../apache/pinot/query/routing/WorkerManager.java  | 97 +++++++++++++---------
 2 files changed, 69 insertions(+), 40 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index e73f47fe0b..91761b31c8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -288,6 +288,18 @@ public class MultiNodesOfflineClusterIntegrationTest 
extends OfflineClusterInteg
     assertEquals(row.get(1).doubleValue(), 725560.0 / 444);
   }
 
+  @Test
+  public void testConstantExpressionQuery()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    JsonNode result = postQuery("SELECT 1");
+    assertEquals(result.get("numServersQueried").intValue(), 1);
+
+    result = postQuery("SELECT DaysSinceEpoch, AVG(CRSArrTime) FROM mytable 
WHERE false GROUP BY 1 ORDER BY 2 DESC");
+    assertEquals(result.get("numServersQueried").intValue(), 1);
+  }
+
   // Disabled because segments might not be server partitioned with multiple 
servers
   @Test(enabled = false)
   @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 4a28f11632..c35799d69a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -22,9 +22,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -191,6 +193,25 @@ public class WorkerManager {
     Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
     DispatchablePlanMetadata metadata = 
metadataMap.get(fragment.getFragmentId());
 
+    if (context.getTableNames().isEmpty()) {
+      // For constant expression query (no table is accessed), assign it to a 
random enabled server.
+      // TODO: Consider short-circuiting it and directly calculating the 
result on broker.
+
+      Collection<ServerInstance> serverInstances = 
_routingManager.getEnabledServerInstanceMap().values();
+      int numServers = serverInstances.size();
+      if (numServers == 0) {
+        LOGGER.error("[RequestId: {}] No server instance found for constant 
expression query", context.getRequestId());
+        throw new IllegalStateException("No server instance found for constant 
expression query");
+      }
+      int index = RANDOM.nextInt(numServers);
+      Iterator<ServerInstance> iterator = serverInstances.iterator();
+      for (int i = 0; i < index; i++) {
+        iterator.next();
+      }
+      metadata.setWorkerIdToServerInstanceMap(Map.of(0, new 
QueryServerInstance(iterator.next())));
+      return;
+    }
+
     if (isPrePartitionAssignment(children, metadataMap)) {
       // If all the children are pre-partitioned the same way, use local 
exchange.
       DispatchablePlanMetadata firstChildMetadata = 
metadataMap.get(children.get(0).getFragmentId());
@@ -200,8 +221,8 @@ public class WorkerManager {
     }
 
     if (metadata.isRequiresSingletonInstance()) {
-      // When singleton instance is required, return a single worker with ID 0.
-      List<ServerInstance> serverInstances = assignServerInstances(context);
+      // When singleton instance is required, assign it to a random candidate 
server.
+      List<ServerInstance> serverInstances = getCandidateServers(context);
       metadata.setWorkerIdToServerInstanceMap(
           Map.of(0, new 
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
       return;
@@ -221,7 +242,7 @@ public class WorkerManager {
     // If there is no local exchange, assign workers to the servers hosting 
the tables
     List<ServerInstance> serverInstances = null;
     if (workerIdToServerInstanceMap == null) {
-      serverInstances = assignServerInstances(context);
+      serverInstances = getCandidateServers(context);
       int stageParallelism = Integer.parseInt(
           
context.getPlannerContext().getOptions().getOrDefault(QueryOptionKey.STAGE_PARALLELISM,
 "1"));
       workerIdToServerInstanceMap = 
Maps.newHashMapWithExpectedSize(serverInstances.size() * stageParallelism);
@@ -319,48 +340,44 @@ public class WorkerManager {
   /**
    * Returns the servers serving any segment of the tables in the query.
    */
-  private List<ServerInstance> assignServerInstances(DispatchablePlanContext 
context) {
+  private List<ServerInstance> getCandidateServers(DispatchablePlanContext 
context) {
     List<ServerInstance> serverInstances;
     Set<String> tableNames = context.getTableNames();
+    assert tableNames != null;
     Map<String, ServerInstance> enabledServerInstanceMap = 
_routingManager.getEnabledServerInstanceMap();
-    if (tableNames.isEmpty()) {
-      // TODO: Short circuit it when no table needs to be scanned
-      // This could be the case from queries that don't actually fetch values 
from the tables. In such cases the
-      // routing need not be tenant aware.
-      // Eg: SELECT 1 AS one FROM select_having_expression_test_test_having 
HAVING 1 > 2;
-      serverInstances = new ArrayList<>(enabledServerInstanceMap.values());
-    } else {
-      Set<String> servers = new HashSet<>();
-      for (String tableName : tableNames) {
-        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-        if (tableType == null) {
-          Set<String> offlineTableServers = 
_routingManager.getServingInstances(
-              
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
-          if (offlineTableServers != null) {
-            servers.addAll(offlineTableServers);
-          }
-          Set<String> realtimeTableServers = 
_routingManager.getServingInstances(
-              
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
-          if (realtimeTableServers != null) {
-            servers.addAll(realtimeTableServers);
-          }
-        } else {
-          Set<String> tableServers = 
_routingManager.getServingInstances(tableName);
-          if (tableServers != null) {
-            servers.addAll(tableServers);
-          }
+    Set<String> servers = new HashSet<>();
+    for (String tableName : tableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+      if (tableType == null) {
+        Set<String> offlineTableServers = _routingManager.getServingInstances(
+            
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
+        if (offlineTableServers != null) {
+          servers.addAll(offlineTableServers);
+        }
+        Set<String> realtimeTableServers = _routingManager.getServingInstances(
+            
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
+        if (realtimeTableServers != null) {
+          servers.addAll(realtimeTableServers);
         }
-      }
-      if (servers.isEmpty()) {
-        // fall back to use all enabled servers if no server is found for the 
tables
-        serverInstances = new ArrayList<>(enabledServerInstanceMap.values());
       } else {
-        serverInstances = new ArrayList<>(servers.size());
-        for (String server : servers) {
-          ServerInstance serverInstance = enabledServerInstanceMap.get(server);
-          if (serverInstance != null) {
-            serverInstances.add(serverInstance);
-          }
+        Set<String> tableServers = 
_routingManager.getServingInstances(tableName);
+        if (tableServers != null) {
+          servers.addAll(tableServers);
+        }
+      }
+    }
+    if (servers.isEmpty()) {
+      // Fall back to use all enabled servers if no server is found for the 
tables.
+      // TODO: Revisit if we should throw an exception instead.
+      LOGGER.warn("[RequestId: {}] No server instance found for intermediate 
stage for tables: {}, "
+          + "falling back to all enabled servers", context.getRequestId(), 
tableNames);
+      serverInstances = new ArrayList<>(enabledServerInstanceMap.values());
+    } else {
+      serverInstances = new ArrayList<>(servers.size());
+      for (String server : servers) {
+        ServerInstance serverInstance = enabledServerInstanceMap.get(server);
+        if (serverInstance != null) {
+          serverInstances.add(serverInstance);
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to