This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 180823d674 [multistage] Add Support for Values in Physical Optimizer (#16221) 180823d674 is described below commit 180823d6743293b20d298eff7b4a0e7b2085faae Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Tue Jul 1 19:53:28 2025 -0500 [multistage] Add Support for Values in Physical Optimizer (#16221) --- .../query/context/PhysicalPlannerContext.java | 33 +++++++- .../v2/opt/rules/LiteModeWorkerAssignmentRule.java | 42 +--------- .../v2/opt/rules/WorkerExchangeAssignmentRule.java | 9 +- .../query/context/PhysicalPlannerContextTest.java | 98 ++++++++++++++++++++++ .../rules/LiteModeWorkerAssignmentRuleTest.java | 24 ------ .../resources/queries/PhysicalOptimizerPlans.json | 57 +++++++++++++ .../query/service/dispatch/QueryDispatcher.java | 2 +- .../src/test/resources/queries/BasicQuery.json | 4 + 8 files changed, 201 insertions(+), 68 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index b6c9d2e2e8..e8e46b3940 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -18,9 +18,13 @@ */ package org.apache.pinot.query.context; +import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.routing.RoutingManager; @@ -80,9 +84,16 @@ public class PhysicalPlannerContext { _liteModeServerStageLimit = CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT; } + public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId, + String instanceId, Map<String, String> queryOptions) { + this(routingManager, hostName, port, requestId, instanceId, queryOptions, + CommonConstants.Broker.DEFAULT_USE_LITE_MODE, CommonConstants.Broker.DEFAULT_RUN_IN_BROKER, + CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING, CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT); + } + public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId, String instanceId, Map<String, String> queryOptions, boolean defaultUseLiteMode, boolean defaultRunInBroker, - boolean defaultUseBrokerPruning, int defaultLiteModeServerStageLimit) { + boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit) { _routingManager = routingManager; _hostName = hostName; _port = port; @@ -93,7 +104,7 @@ public class PhysicalPlannerContext { _runInBroker = QueryOptionsUtils.isRunInBroker(_queryOptions, defaultRunInBroker); _useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions, defaultUseBrokerPruning); _liteModeServerStageLimit = QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions, - defaultLiteModeServerStageLimit); + defaultLiteModeLeafStageLimit); _instanceIdToQueryServerInstance.put(instanceId, getBrokerQueryServerInstance()); } @@ -146,6 +157,24 @@ public class PhysicalPlannerContext { return _liteModeServerStageLimit; } + /** + * Gets a random instance id from the registered instances in the context. + * <p> + * <b>Important:</b> This method will always return a server instanceId, unless no server has yet been registered + * with the context, which could happen for queries which don't consist of any table-scans. + * </p> + */ + public String getRandomInstanceId() { + Preconditions.checkState(!_instanceIdToQueryServerInstance.isEmpty(), "No instances present in context"); + if (_instanceIdToQueryServerInstance.size() == 1) { + return _instanceIdToQueryServerInstance.keySet().iterator().next(); + } + int numCandidates = _instanceIdToQueryServerInstance.size() - 1; + Random random = ThreadLocalRandom.current(); + return _instanceIdToQueryServerInstance.keySet().stream().filter(instanceId -> !_instanceId.equals(instanceId)) + .collect(Collectors.toList()).get(numCandidates == 1 ? 0 : random.nextInt(numCandidates - 1)); + } + private QueryServerInstance getBrokerQueryServerInstance() { return new QueryServerInstance(_instanceId, _hostName, _port, _port); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java index 2ebaa7a837..e3cf4379bb 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java @@ -18,15 +18,10 @@ */ package org.apache.pinot.query.planner.physical.v2.opt.rules; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; @@ -50,7 +45,6 @@ import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer; * plan nodes. */ public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer { - private static final Random RANDOM = new Random(); private final PhysicalPlannerContext _context; private final boolean _runInBroker; @@ -61,13 +55,11 @@ public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer { @Override public PRelNode execute(PRelNode currentNode) { - Set<String> workerSet = new HashSet<>(); List<String> workers; if (_runInBroker) { - workers = List.of(String.format("0@%s", _context.getInstanceId())); + workers = List.of("0@" + _context.getInstanceId()); } else { - accumulateWorkers(currentNode, workerSet); - workers = List.of(sampleWorker(new ArrayList<>(workerSet))); + workers = List.of("0@" + _context.getRandomInstanceId()); } return addExchangeAndWorkers(currentNode, null, workers); } @@ -98,36 +90,6 @@ public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer { return currentNode; } - /** - * Stores workers assigned to the leaf stage nodes into the provided Set. Note that each worker has an integer prefix - * which denotes the "workerId". We remove that prefix before storing them in the set. - */ - @VisibleForTesting - static void accumulateWorkers(PRelNode currentNode, Set<String> workerSink) { - if (currentNode.isLeafStage()) { - workerSink.addAll(currentNode.getPinotDataDistributionOrThrow().getWorkers().stream() - .map(LiteModeWorkerAssignmentRule::stripIdPrefixFromWorker).collect(Collectors.toList())); - return; - } - for (PRelNode input : currentNode.getPRelInputs()) { - accumulateWorkers(input, workerSink); - } - } - - /** - * Samples a worker from the given list. - */ - @VisibleForTesting - static String sampleWorker(List<String> instanceIds) { - Preconditions.checkState(!instanceIds.isEmpty(), "No workers in leaf stage"); - return String.format("0@%s", instanceIds.get(RANDOM.nextInt(instanceIds.size()))); - } - - @VisibleForTesting - static String stripIdPrefixFromWorker(String worker) { - return worker.split("@")[1]; - } - /** * Infers Exchange to be added on top of the leaf stage. */ diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java index 0ec60011df..8bffea3093 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java @@ -38,6 +38,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.Values; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable; import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait; @@ -394,12 +395,18 @@ public class WorkerExchangeAssignmentRule implements PRelNodeTransformer { * Computes the PinotDataDistribution of the given node from the input node. This assumes that all traits of the * input node are already satisfied. */ - private static PinotDataDistribution computeCurrentNodeDistribution(PRelNode currentNode, @Nullable PRelNode parent) { + private PinotDataDistribution computeCurrentNodeDistribution(PRelNode currentNode, @Nullable PRelNode parent) { if (currentNode.getPinotDataDistribution() != null) { Preconditions.checkState(isLeafStageBoundary(currentNode, parent), "current node should not have assigned data distribution unless it's a boundary"); return currentNode.getPinotDataDistributionOrThrow(); } + if (currentNode.getPRelInputs().isEmpty()) { + Preconditions.checkState(currentNode.unwrap() instanceof Values, "Expected Values node. Found: %s", + currentNode.unwrap()); + List<String> workers = List.of(String.format("0@%s", _physicalPlannerContext.getRandomInstanceId())); + return new PinotDataDistribution(RelDistribution.Type.SINGLETON, workers, workers.hashCode(), null, null); + } PinotDataDistribution inputDistribution = currentNode.getPRelInput(0).getPinotDataDistributionOrThrow(); PinotDataDistribution newDistribution = inputDistribution.apply(DistMappingGenerator.compute( currentNode.unwrap().getInput(0), currentNode.unwrap(), null), diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/context/PhysicalPlannerContextTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/context/PhysicalPlannerContextTest.java new file mode 100644 index 0000000000..427ce8ade1 --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/context/PhysicalPlannerContextTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.context; + +import java.util.Map; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.query.routing.QueryServerInstance; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; + +public class PhysicalPlannerContextTest { + + @Test + public void testGetRandomInstanceIdWithNoInstances() { + // Test case: No instances present in context (should throw IllegalStateException) + PhysicalPlannerContext context = createPhysicalPlannerContext(); + + // Clear the instances map to simulate no instances + context.getInstanceIdToQueryServerInstance().clear(); + + try { + context.getRandomInstanceId(); + Assert.fail("Expected IllegalStateException when no instances are present"); + } catch (IllegalStateException e) { + Assert.assertEquals(e.getMessage(), "No instances present in context"); + } + } + + @Test + public void testGetRandomInstanceIdWithSingleInstance() { + // Test case: Only one instance present (should return that instance) + PhysicalPlannerContext context = createPhysicalPlannerContext(); + + // The constructor automatically adds the broker instance, so we should have exactly one + String randomInstanceId = context.getRandomInstanceId(); + Assert.assertEquals(randomInstanceId, "broker_instance_1"); + } + + @Test + public void testGetRandomInstanceIdWithMultipleInstances() { + // Test case: Multiple instances present (should return one that's not the current instance) + PhysicalPlannerContext context = createPhysicalPlannerContext(); + + // Add additional server instances + QueryServerInstance serverInstance2 = new QueryServerInstance("server_instance_2", "host2", 8081, 8081); + QueryServerInstance serverInstance3 = new QueryServerInstance("server_instance_3", "host3", 8082, 8082); + + context.getInstanceIdToQueryServerInstance().put("server_instance_2", serverInstance2); + context.getInstanceIdToQueryServerInstance().put("server_instance_3", serverInstance3); + + // Call getRandomInstanceId multiple times to verify it returns different server instances + // but never the broker instance + for (int i = 0; i < 10; i++) { + String randomInstanceId = context.getRandomInstanceId(); + Assert.assertNotEquals(randomInstanceId, "broker_instance_1", + "Random instance should not be the current broker instance"); + Assert.assertTrue(randomInstanceId.equals("server_instance_2") || randomInstanceId.equals("server_instance_3"), + "Random instance should be one of the server instances"); + } + } + + @Test + public void testGetRandomInstanceIdWithTwoInstances() { + // Test case: Two instances (broker + one server) - should return the server + PhysicalPlannerContext context = createPhysicalPlannerContext(); + + // Add one server instance + QueryServerInstance serverInstance = new QueryServerInstance("server_instance_1", "host1", 8081, 8081); + context.getInstanceIdToQueryServerInstance().put("server_instance_1", serverInstance); + + String randomInstanceId = context.getRandomInstanceId(); + Assert.assertEquals(randomInstanceId, "server_instance_1", + "With two instances, should return the non-broker instance"); + } + + private PhysicalPlannerContext createPhysicalPlannerContext() { + RoutingManager mockRoutingManager = mock(RoutingManager.class); + return new PhysicalPlannerContext(mockRoutingManager, "localhost", 8080, 12345L, "broker_instance_1", Map.of()); + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java index dd70f415d8..7557d5bb07 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRuleTest.java @@ -18,39 +18,15 @@ */ package org.apache.pinot.query.planner.physical.v2.opt.rules; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.pinot.query.planner.physical.v2.PRelNode; import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution; import org.mockito.Mockito; -import org.testng.annotations.Test; import static org.mockito.Mockito.doReturn; -import static org.testng.Assert.*; public class LiteModeWorkerAssignmentRuleTest { - @Test - public void testAccumulateWorkers() { - PRelNode leafOne = create(List.of(), true, List.of("0@server-1", "1@server-2")); - PRelNode leafTwo = create(List.of(), true, List.of("0@server-2", "1@server-1")); - PRelNode intermediateNode = create(List.of(leafOne, leafTwo), false, List.of("0@server-3", "1@server-4")); - Set<String> workers = new HashSet<>(); - LiteModeWorkerAssignmentRule.accumulateWorkers(intermediateNode, workers); - assertEquals(workers, Set.of("server-1", "server-2")); - } - - @Test - public void testSampleWorker() { - List<String> workers = List.of("worker-0", "worker-1", "worker-2"); - Set<String> selectionCandidates = Set.of("0@worker-0", "0@worker-1", "0@worker-2"); - Set<String> selectedWorkers = new HashSet<>(); - for (int iteration = 0; iteration < 1000; iteration++) { - selectedWorkers.add(LiteModeWorkerAssignmentRule.sampleWorker(workers)); - } - assertEquals(selectedWorkers, selectionCandidates); - } private PRelNode create(List<PRelNode> inputs, boolean isLeafStage, List<String> workers) { // Setup mock pinot data distribution. diff --git a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json index 96c95f4498..bd5137c837 100644 --- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json +++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json @@ -1,4 +1,61 @@ { + "physical_opt_constant_queries": { + "queries": [ + { + "description": "Select 1", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 1", + "output": [ + "Execution Plan", + "\nPhysicalValues(tuples=[[{ 1 }]])", + "\n" + ] + }, + { + "description": "Constant only join query", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR WITH tmp1(id, name) AS (VALUES(1, 'foo')), tmp2(user_id, nm2) AS (VALUES(1, 'bar')) SELECT * FROM tmp1 JOIN tmp2 ON 1=1", + "output": [ + "Execution Plan", + "\nPhysicalJoin(condition=[true], joinType=[inner])", + "\n PhysicalValues(tuples=[[{ 1, _UTF-8'foo' }]])", + "\n PhysicalValues(tuples=[[{ 1, _UTF-8'bar' }]])", + "\n" + ] + }, + { + "description": "Query that gets optimized to a Values node", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM a WHERE col1 IS NULL LIMIT 1", + "output": [ + "Execution Plan", + "\nPhysicalSort(fetch=[1])", + "\n PhysicalValues(tuples=[[]])", + "\n" + ] + }, + { + "description": "Constant only join query", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT COUNT(*) FROM a WHERE col1 IN (WITH tmp1(id, name) AS (VALUES(1, 'foo')), tmp2(user_id, nm) AS (VALUES(2, 'bar')) SELECT A.name FROM tmp1 AS A JOIN tmp2 AS B ON A.id = B.user_id)", + "output": [ + "Execution Plan", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{}], agg#0=[COUNT($0)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{}], agg#0=[COUNT()], aggType=[LEAF])", + "\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col1=[$0])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(name=[$1])", + "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", + "\n PhysicalValues(tuples=[[]])", + "\n PhysicalProject(EXPR$0=[$0])", + "\n PhysicalValues(tuples=[[]])", + "\n" + ] + } + ] + }, "physical_opt_chained_subqueries": { "queries": [ { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index d7ef6790d2..68cac7e9ce 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -320,7 +320,7 @@ public class QueryDispatcher { serializePlanFragments(stagePlans, serverInstancesOut, deadline); if (serverInstancesOut.isEmpty()) { - throw new RuntimeException("No server instances to dispatch query to"); + return; } Map<String, String> requestMetadata = diff --git a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json index eb8e8d53d6..71f239db5e 100644 --- a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json +++ b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json @@ -20,6 +20,10 @@ { "description": "basic test with literal", "sql": "SELECT 1 AS int, CAST(2 AS DOUBLE) AS double" + }, + { + "description": "select 1 but alias to a reserved column name", + "sql": "SELECT 1 as \"timestamp\"" } ] }, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org