This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 99cde4d164 [MSE] For constant expression query, solve it with a single random server (#16083) 99cde4d164 is described below commit 99cde4d16433a3ded02a5a6a6c3b230d048bc8c0 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Jun 12 11:08:48 2025 -0600 [MSE] For constant expression query, solve it with a single random server (#16083) --- .../MultiNodesOfflineClusterIntegrationTest.java | 12 +++ .../apache/pinot/query/routing/WorkerManager.java | 97 +++++++++++++--------- 2 files changed, 69 insertions(+), 40 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index e73f47fe0b..91761b31c8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -288,6 +288,18 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg assertEquals(row.get(1).doubleValue(), 725560.0 / 444); } + @Test + public void testConstantExpressionQuery() + throws Exception { + setUseMultiStageQueryEngine(true); + + JsonNode result = postQuery("SELECT 1"); + assertEquals(result.get("numServersQueried").intValue(), 1); + + result = postQuery("SELECT DaysSinceEpoch, AVG(CRSArrTime) FROM mytable WHERE false GROUP BY 1 ORDER BY 2 DESC"); + assertEquals(result.get("numServersQueried").intValue(), 1); + } + // Disabled because segments might not be server partitioned with multiple servers @Test(enabled = false) @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 4a28f11632..c35799d69a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -22,9 +22,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -191,6 +193,25 @@ public class WorkerManager { Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap(); DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId()); + if (context.getTableNames().isEmpty()) { + // For constant expression query (no table is accessed), assign it to a random enabled server. + // TODO: Consider short-circuiting it and directly calculating the result on broker. + + Collection<ServerInstance> serverInstances = _routingManager.getEnabledServerInstanceMap().values(); + int numServers = serverInstances.size(); + if (numServers == 0) { + LOGGER.error("[RequestId: {}] No server instance found for constant expression query", context.getRequestId()); + throw new IllegalStateException("No server instance found for constant expression query"); + } + int index = RANDOM.nextInt(numServers); + Iterator<ServerInstance> iterator = serverInstances.iterator(); + for (int i = 0; i < index; i++) { + iterator.next(); + } + metadata.setWorkerIdToServerInstanceMap(Map.of(0, new QueryServerInstance(iterator.next()))); + return; + } + if (isPrePartitionAssignment(children, metadataMap)) { // If all the children are pre-partitioned the same way, use local exchange. DispatchablePlanMetadata firstChildMetadata = metadataMap.get(children.get(0).getFragmentId()); @@ -200,8 +221,8 @@ public class WorkerManager { } if (metadata.isRequiresSingletonInstance()) { - // When singleton instance is required, return a single worker with ID 0. - List<ServerInstance> serverInstances = assignServerInstances(context); + // When singleton instance is required, assign it to a random candidate server. + List<ServerInstance> serverInstances = getCandidateServers(context); metadata.setWorkerIdToServerInstanceMap( Map.of(0, new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size()))))); return; @@ -221,7 +242,7 @@ public class WorkerManager { // If there is no local exchange, assign workers to the servers hosting the tables List<ServerInstance> serverInstances = null; if (workerIdToServerInstanceMap == null) { - serverInstances = assignServerInstances(context); + serverInstances = getCandidateServers(context); int stageParallelism = Integer.parseInt( context.getPlannerContext().getOptions().getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1")); workerIdToServerInstanceMap = Maps.newHashMapWithExpectedSize(serverInstances.size() * stageParallelism); @@ -319,48 +340,44 @@ public class WorkerManager { /** * Returns the servers serving any segment of the tables in the query. */ - private List<ServerInstance> assignServerInstances(DispatchablePlanContext context) { + private List<ServerInstance> getCandidateServers(DispatchablePlanContext context) { List<ServerInstance> serverInstances; Set<String> tableNames = context.getTableNames(); + assert tableNames != null; Map<String, ServerInstance> enabledServerInstanceMap = _routingManager.getEnabledServerInstanceMap(); - if (tableNames.isEmpty()) { - // TODO: Short circuit it when no table needs to be scanned - // This could be the case from queries that don't actually fetch values from the tables. In such cases the - // routing need not be tenant aware. - // Eg: SELECT 1 AS one FROM select_having_expression_test_test_having HAVING 1 > 2; - serverInstances = new ArrayList<>(enabledServerInstanceMap.values()); - } else { - Set<String> servers = new HashSet<>(); - for (String tableName : tableNames) { - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); - if (tableType == null) { - Set<String> offlineTableServers = _routingManager.getServingInstances( - TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName)); - if (offlineTableServers != null) { - servers.addAll(offlineTableServers); - } - Set<String> realtimeTableServers = _routingManager.getServingInstances( - TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName)); - if (realtimeTableServers != null) { - servers.addAll(realtimeTableServers); - } - } else { - Set<String> tableServers = _routingManager.getServingInstances(tableName); - if (tableServers != null) { - servers.addAll(tableServers); - } + Set<String> servers = new HashSet<>(); + for (String tableName : tableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType == null) { + Set<String> offlineTableServers = _routingManager.getServingInstances( + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName)); + if (offlineTableServers != null) { + servers.addAll(offlineTableServers); + } + Set<String> realtimeTableServers = _routingManager.getServingInstances( + TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName)); + if (realtimeTableServers != null) { + servers.addAll(realtimeTableServers); } - } - if (servers.isEmpty()) { - // fall back to use all enabled servers if no server is found for the tables - serverInstances = new ArrayList<>(enabledServerInstanceMap.values()); } else { - serverInstances = new ArrayList<>(servers.size()); - for (String server : servers) { - ServerInstance serverInstance = enabledServerInstanceMap.get(server); - if (serverInstance != null) { - serverInstances.add(serverInstance); - } + Set<String> tableServers = _routingManager.getServingInstances(tableName); + if (tableServers != null) { + servers.addAll(tableServers); + } + } + } + if (servers.isEmpty()) { + // Fall back to use all enabled servers if no server is found for the tables. + // TODO: Revisit if we should throw an exception instead. + LOGGER.warn("[RequestId: {}] No server instance found for intermediate stage for tables: {}, " + + "falling back to all enabled servers", context.getRequestId(), tableNames); + serverInstances = new ArrayList<>(enabledServerInstanceMap.values()); + } else { + serverInstances = new ArrayList<>(servers.size()); + for (String server : servers) { + ServerInstance serverInstance = enabledServerInstanceMap.get(server); + if (serverInstance != null) { + serverInstances.add(serverInstance); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org