This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2d41b3806e [Multi-stage] Optimize mailbox info in query plan (#12382) 2d41b3806e is described below commit 2d41b3806e14b4d9822c6eddfa57acf613356db2 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Feb 8 15:31:58 2024 -0800 [Multi-stage] Optimize mailbox info in query plan (#12382) --- pinot-common/src/main/proto/worker.proto | 25 ++- .../explain/PhysicalExplainPlanVisitor.java | 35 +-- .../planner/physical/DispatchablePlanContext.java | 8 +- .../planner/physical/DispatchablePlanMetadata.java | 8 +- .../planner/physical/MailboxAssignmentVisitor.java | 124 ++++++----- .../query/planner/physical/MailboxIdUtils.java | 43 ++-- .../{MailboxMetadata.java => MailboxInfo.java} | 39 ++-- .../apache/pinot/query/routing/MailboxInfos.java | 35 ++- .../pinot/query/routing}/QueryPlanSerDeUtils.java | 64 +++--- .../apache/pinot/query/routing/RoutingInfo.java | 38 ++-- .../pinot/query/routing/SharedMailboxInfos.java | 38 ++-- .../apache/pinot/query/routing}/StageMetadata.java | 11 +- .../org/apache/pinot/query/routing}/StagePlan.java | 10 +- .../apache/pinot/query/routing/WorkerMetadata.java | 24 +-- .../apache/pinot/query/QueryCompilationTest.java | 239 ++++++++++----------- .../apache/pinot/query/runtime/QueryRunner.java | 49 +++-- .../operator/BaseMailboxReceiveOperator.java | 10 +- .../runtime/operator/MailboxSendOperator.java | 26 +-- .../runtime/plan/OpChainExecutionContext.java | 18 +- .../plan/pipeline/PipelineBreakerExecutor.java | 6 +- .../plan/server/ServerPlanRequestContext.java | 2 +- .../plan/server/ServerPlanRequestUtils.java | 4 +- .../query/service/dispatch/QueryDispatcher.java | 16 +- .../pinot/query/service/server/QueryServer.java | 10 +- .../apache/pinot/query/QueryServerEnclosure.java | 2 +- .../executor/OpChainSchedulerServiceTest.java | 17 +- .../operator/MailboxReceiveOperatorTest.java | 29 ++- .../runtime/operator/MailboxSendOperatorTest.java | 21 +- .../pinot/query/runtime/operator/OpChainTest.java | 58 ++--- .../query/runtime/operator/OperatorTestUtil.java | 17 +- .../operator/SortedMailboxReceiveOperatorTest.java | 29 ++- .../plan/pipeline/PipelineBreakerExecutorTest.java | 33 +-- .../query/runtime/queries/QueryRunnerTestBase.java | 10 +- .../query/service/server/QueryServerTest.java | 18 +- 34 files changed, 543 insertions(+), 573 deletions(-) diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto index b7e492fcc5..8ad277c96d 100644 --- a/pinot-common/src/main/proto/worker.proto +++ b/pinot-common/src/main/proto/worker.proto @@ -49,25 +49,30 @@ message QueryResponse { } message StagePlan { - int32 stageId = 1; - bytes rootNode = 2; // Serialized StageNode - StageMetadata stageMetadata = 3; + bytes rootNode = 1; // Serialized StageNode + StageMetadata stageMetadata = 2; } message StageMetadata { - repeated WorkerMetadata workerMetadata = 1; - bytes customProperty = 2; // Serialized Properties + int32 stageId = 1; + repeated WorkerMetadata workerMetadata = 2; + bytes customProperty = 3; // Serialized Properties } message WorkerMetadata { - string virtualAddress = 1; - map<int32, MailboxMetadata> mailboxMetadata = 2; + int32 workedId = 1; + map<int32, bytes> mailboxInfos = 2; // Stage id to serialized MailboxInfos map<string, string> customProperty = 3; } -message MailboxMetadata { - repeated string mailboxId = 1; - repeated string virtualAddress = 2; +message MailboxInfos { + repeated MailboxInfo mailboxInfo = 1; +} + +message MailboxInfo { + string hostname = 1; + int32 port = 2; + repeated int32 workerId = 3; } message Properties { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java index ea9bef1139..61dd1a23f0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java @@ -18,12 +18,10 @@ */ package org.apache.pinot.query.planner.explain; -import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; @@ -41,8 +39,8 @@ import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.query.planner.plannode.TableScanNode; import org.apache.pinot.query.planner.plannode.ValueNode; import org.apache.pinot.query.planner.plannode.WindowNode; +import org.apache.pinot.query.routing.MailboxInfo; import org.apache.pinot.query.routing.QueryServerInstance; -import org.apache.pinot.query.routing.VirtualServerAddress; /** @@ -214,14 +212,13 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder appendInfo(node, context); int receiverStageId = node.getReceiverStageId(); - List<VirtualServerAddress> serverAddressList = + List<MailboxInfo> receiverMailboxInfos = _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList() - .get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses(); - List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList); + .get(context._workerId).getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); context._builder.append("->"); - String receivers = serverInstanceToWorkerIdList.stream() - .map(s -> "[" + receiverStageId + "]@" + s) - .collect(Collectors.joining(",", "{", "}")); + // Sort to ensure print order + String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort)) + .map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}")); return context._builder.append(receivers); } @@ -276,24 +273,4 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder ); } } - - public static List<String> stringifyVirtualServerAddresses(List<VirtualServerAddress> serverAddressList) { - // using tree map to ensure print order. - Map<QueryServerInstance, List<Integer>> serverToWorkerIdMap = new TreeMap<>( - Comparator.comparing(QueryServerInstance::toString)); - for (VirtualServerAddress serverAddress : serverAddressList) { - QueryServerInstance server = new QueryServerInstance(serverAddress.hostname(), serverAddress.port(), -1); - List<Integer> workerIds = serverToWorkerIdMap.getOrDefault(server, new ArrayList<>()); - workerIds.add(serverAddress.workerId()); - serverToWorkerIdMap.put(server, workerIds); - } - return serverToWorkerIdMap.entrySet().stream() - .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry) - .collect(Collectors.toList()); - } - - public static String stringifyQueryServerInstanceToWorkerIdsEntry(Map.Entry<QueryServerInstance, List<Integer>> e) { - QueryServerInstance server = e.getKey(); - return server.getHostname() + ":" + server.getQueryServicePort() + "|" + e.getValue(); - } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java index 22744dda0e..6d421e90d9 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java @@ -29,9 +29,8 @@ import org.apache.calcite.util.Pair; import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.routing.MailboxInfos; import org.apache.pinot.query.routing.QueryServerInstance; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerManager; import org.apache.pinot.query.routing.WorkerMetadata; @@ -100,7 +99,7 @@ public class DispatchablePlanContext { dispatchablePlanMetadata.getWorkerIdToServerInstanceMap(); Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = dispatchablePlanMetadata.getWorkerIdToSegmentsMap(); - Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap = + Map<Integer, Map<Integer, MailboxInfos>> workerIdToMailboxesMap = dispatchablePlanMetadata.getWorkerIdToMailboxesMap(); Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = new HashMap<>(); WorkerMetadata[] workerMetadataArray = new WorkerMetadata[workerIdToServerInstanceMap.size()]; @@ -108,8 +107,7 @@ public class DispatchablePlanContext { int workerId = serverEntry.getKey(); QueryServerInstance queryServerInstance = serverEntry.getValue(); serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId); - WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId), - workerIdToMailboxesMap.get(workerId)); + WorkerMetadata workerMetadata = new WorkerMetadata(workerId, workerIdToMailboxesMap.get(workerId)); if (workerIdToSegmentsMap != null) { workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId)); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java index 5415ca7019..db68010103 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.core.routing.TimeBoundaryInfo; -import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.routing.MailboxInfos; import org.apache.pinot.query.routing.QueryServerInstance; @@ -54,6 +54,8 @@ public class DispatchablePlanMetadata implements Serializable { // info from PlanNode that requires singleton (e.g. SortNode/AggregateNode) private boolean _requiresSingletonInstance; + // TODO: Change the following maps to lists + // -------------------------------------------------------------------------- // Fields extracted with {@link PinotDispatchPlanner} // -------------------------------------------------------------------------- @@ -67,7 +69,7 @@ public class DispatchablePlanMetadata implements Serializable { // used for build mailboxes between workers. // workerId -> {planFragmentId -> mailbox list} - private final Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap; + private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap; // used for tracking unavailable segments from routing table, then assemble missing segments exception. private final Map<String, Set<String>> _tableToUnavailableSegmentsMap; @@ -123,7 +125,7 @@ public class DispatchablePlanMetadata implements Serializable { _workerIdToSegmentsMap = workerIdToSegmentsMap; } - public Map<Integer, Map<Integer, MailboxMetadata>> getWorkerIdToMailboxesMap() { + public Map<Integer, Map<Integer, MailboxInfos>> getWorkerIdToMailboxesMap() { return _workerIdToMailboxesMap; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java index 421e7bbc9c..7bdcdb1343 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java @@ -19,18 +19,20 @@ package org.apache.pinot.query.planner.physical; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.MailboxInfos; import org.apache.pinot.query.routing.QueryServerInstance; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.SharedMailboxInfos; public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<Void, DispatchablePlanContext> { @@ -47,8 +49,8 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverFragmentId); Map<Integer, QueryServerInstance> senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap(); Map<Integer, QueryServerInstance> receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap(); - Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); - Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); + Map<Integer, Map<Integer, MailboxInfos>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); + Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); int numSenders = senderServerMap.size(); int numReceivers = receiverServerMap.size(); @@ -63,11 +65,11 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V Preconditions.checkState(senderServer.equals(receiverServer), "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s", workerId, senderServer, receiverServer); - MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList( - MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)), - Collections.singletonList(new VirtualServerAddress(senderServer, workerId))); - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata); + MailboxInfos mailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), + ImmutableList.of(workerId))); + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxInfos); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxInfos); } } else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) { // - direct exchange possible: @@ -78,82 +80,77 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V if (partitionParallelism == 1) { // 1-to-1 mapping for (int workerId = 0; workerId < numSenders; workerId++) { - String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId); - MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), - Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId))); - MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), - Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId))); + QueryServerInstance senderServer = senderServerMap.get(workerId); + QueryServerInstance receiverServer = receiverServerMap.get(workerId); + List<Integer> workerIds = ImmutableList.of(workerId); + MailboxInfos senderMailboxInfos; + MailboxInfos receiverMailboxInfos; + if (senderServer.equals(receiverServer)) { + senderMailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); + receiverMailboxInfos = senderMailboxInfos; + } else { + senderMailboxInfos = new MailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); + receiverMailboxInfos = new MailboxInfos( + new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds)); + } senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(receiverFragmentId, serderMailboxMetadata); + .put(receiverFragmentId, receiverMailboxInfos); receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(senderFragmentId, receiverMailboxMetadata); + .put(senderFragmentId, senderMailboxInfos); } } else { // 1-to-<partition_parallelism> mapping int receiverWorkerId = 0; for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { - VirtualServerAddress senderAddress = - new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId); - List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism); - List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism); - MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses); - senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) - .put(receiverFragmentId, senderMailboxMetadata); + QueryServerInstance senderServer = senderServerMap.get(senderWorkerId); + QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId); + List<Integer> receiverWorkerIds = new ArrayList<>(partitionParallelism); + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverFragmentId, + new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), + receiverWorkerIds))); + MailboxInfos senderMailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), + ImmutableList.of(senderWorkerId))); for (int i = 0; i < partitionParallelism; i++) { - String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, - receiverWorkerId); - receivingMailboxIds.add(mailboxId); - receivingAddresses.add( - new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId)); - - MailboxMetadata receiverMailboxMetadata = - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) - .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata()); - receiverMailboxMetadata.getMailboxIds().add(mailboxId); - receiverMailboxMetadata.getVirtualAddresses().add(senderAddress); - + receiverWorkerIds.add(receiverWorkerId); + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .put(senderFragmentId, senderMailboxInfos); receiverWorkerId++; } } } } else { // For other exchange types, send the data to all the instances in the receiver fragment - // NOTE: Keep the receiver worker id sequential in the senderMailboxMetadata so that the partitionId aligns with - // the workerId. It is useful for JOIN query when only left table is partitioned. // TODO: Add support for more exchange types + List<MailboxInfo> receiverMailboxInfoList = getMailboxInfos(receiverServerMap); + MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList) + : new MailboxInfos(receiverMailboxInfoList); for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { - VirtualServerAddress senderAddress = - new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId); - List<String> receivingMailboxIds = new ArrayList<>(numReceivers); - List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers); - MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses); senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) - .put(receiverFragmentId, senderMailboxMetadata); - for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { - String mailboxId = - MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId); - receivingMailboxIds.add(mailboxId); - receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId)); - - MailboxMetadata receiverMailboxMetadata = - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) - .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata()); - receiverMailboxMetadata.getMailboxIds().add(mailboxId); - receiverMailboxMetadata.getVirtualAddresses().add(senderAddress); - } + .put(receiverFragmentId, receiverMailboxInfos); + } + List<MailboxInfo> senderMailboxInfoList = getMailboxInfos(senderServerMap); + MailboxInfos senderMailboxInfos = + numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) : new MailboxInfos(senderMailboxInfoList); + for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .put(senderFragmentId, senderMailboxInfos); } } } return null; } - private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, DispatchablePlanMetadata receiver) { + private static boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, + DispatchablePlanMetadata receiver) { Map<Integer, QueryServerInstance> senderServerMap = sender.getWorkerIdToServerInstanceMap(); Map<Integer, QueryServerInstance> receiverServerMap = receiver.getWorkerIdToServerInstanceMap(); int numSenders = senderServerMap.size(); int numReceivers = receiverServerMap.size(); - if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) { + if (!sender.getScannedTables().isEmpty() && receiver.getScannedTables().isEmpty()) { // leaf-to-intermediate condition return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null && sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction()); @@ -163,4 +160,15 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V .equalsIgnoreCase(receiver.getPartitionFunction()); } } + + private static List<MailboxInfo> getMailboxInfos(Map<Integer, QueryServerInstance> workerIdToServerMap) { + Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = new HashMap<>(); + int numServers = workerIdToServerMap.size(); + for (int workerId = 0; workerId < numServers; workerId++) { + serverToWorkerIdsMap.computeIfAbsent(workerIdToServerMap.get(workerId), k -> new ArrayList<>()).add(workerId); + } + return serverToWorkerIdsMap.entrySet().stream() + .map(e -> new MailboxInfo(e.getKey().getHostname(), e.getKey().getQueryMailboxPort(), e.getValue())) + .collect(Collectors.toList()); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java index 32c7d3197a..f81e243dc7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java @@ -19,9 +19,10 @@ package org.apache.pinot.query.planner.physical; import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.RoutingInfo; public class MailboxIdUtils { @@ -30,23 +31,35 @@ public class MailboxIdUtils { public static final char SEPARATOR = '|'; - public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId, + @VisibleForTesting + public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId, int receiverWorkerId) { - return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR - + receiverWorkerId; - } - - public static String toMailboxId(long requestId, String planMailboxId) { - return Long.toString(requestId) + SEPARATOR + planMailboxId; + return Long.toString(requestId) + SEPARATOR + senderStageId + SEPARATOR + senderWorkerId + SEPARATOR + + receiverStageId + SEPARATOR + receiverWorkerId; } - public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) { - return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList()); + public static List<RoutingInfo> toRoutingInfos(long requestId, int senderStageId, int senderWorkerId, + int receiverStageId, List<MailboxInfo> receiverMailboxInfos) { + List<RoutingInfo> routingInfos = new ArrayList<>(); + for (MailboxInfo mailboxInfo : receiverMailboxInfos) { + String hostname = mailboxInfo.getHostname(); + int port = mailboxInfo.getPort(); + for (int receiverWorkerId : mailboxInfo.getWorkerIds()) { + routingInfos.add(new RoutingInfo(hostname, port, + toMailboxId(requestId, senderStageId, senderWorkerId, receiverStageId, receiverWorkerId))); + } + } + return routingInfos; } - @VisibleForTesting - public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId, - int receiverWorkerId) { - return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId)); + public static List<String> toMailboxIds(long requestId, int senderStageId, List<MailboxInfo> senderMailboxInfos, + int receiverStageId, int receiverWorkerId) { + List<String> mailboxIds = new ArrayList<>(); + for (MailboxInfo mailboxInfo : senderMailboxInfos) { + for (int senderWorkerId : mailboxInfo.getWorkerIds()) { + mailboxIds.add(toMailboxId(requestId, senderStageId, senderWorkerId, receiverStageId, receiverWorkerId)); + } + } + return mailboxIds; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java similarity index 50% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java index b3484d1a7b..b7ad8c5f61 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java @@ -18,42 +18,37 @@ */ package org.apache.pinot.query.routing; -import java.util.ArrayList; import java.util.List; /** - * {@code MailboxMetadata} wraps around a list of mailboxes information from/to one connected stage. - * It contains the following information: - * <ul> - * <li>MailboxId: the unique id of the mailbox</li> - * <li>VirtualAddress: the virtual address of the mailbox</li> - * </ul> + * {@code MailboxInfo} wraps the mailbox information from/to one connected server. */ -public class MailboxMetadata { - private final List<String> _mailboxIds; - private final List<VirtualServerAddress> _virtualAddresses; - - public MailboxMetadata() { - _mailboxIds = new ArrayList<>(); - _virtualAddresses = new ArrayList<>(); +public class MailboxInfo { + private final String _hostname; + private final int _port; + private final List<Integer> _workerIds; + + public MailboxInfo(String hostname, int port, List<Integer> workerIds) { + _hostname = hostname; + _port = port; + _workerIds = workerIds; } - public MailboxMetadata(List<String> mailboxIds, List<VirtualServerAddress> virtualAddresses) { - _mailboxIds = mailboxIds; - _virtualAddresses = virtualAddresses; + public String getHostname() { + return _hostname; } - public List<String> getMailboxIds() { - return _mailboxIds; + public int getPort() { + return _port; } - public List<VirtualServerAddress> getVirtualAddresses() { - return _virtualAddresses; + public List<Integer> getWorkerIds() { + return _workerIds; } @Override public String toString() { - return _mailboxIds + "@" + _virtualAddresses; + return _hostname + ":" + _port + "|" + _workerIds; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java similarity index 50% copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java index a45b48c5de..76c6e94224 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java @@ -16,36 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan; +package org.apache.pinot.query.routing; -import org.apache.pinot.query.planner.plannode.PlanNode; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.List; -/** - * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}. - * - * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}. - */ -public class StagePlan { - private final int _stageId; - private final PlanNode _rootNode; - private final StageMetadata _stageMetadata; +public class MailboxInfos { + private final List<MailboxInfo> _mailboxInfos; - public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) { - _stageId = stageId; - _rootNode = rootNode; - _stageMetadata = stageMetadata; + public MailboxInfos(List<MailboxInfo> mailboxInfos) { + _mailboxInfos = mailboxInfos; } - public int getStageId() { - return _stageId; + public MailboxInfos(MailboxInfo mailboxInfo) { + _mailboxInfos = ImmutableList.of(mailboxInfo); } - public PlanNode getRootNode() { - return _rootNode; + public List<MailboxInfo> getMailboxInfos() { + return _mailboxInfos; } - public StageMetadata getStageMetadata() { - return _stageMetadata; + public ByteString toProtoBytes() { + return QueryPlanSerDeUtils.toProtoMailboxInfos(_mailboxInfos).toByteString(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java similarity index 53% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java index fbfb9487b4..d692d634b4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java @@ -16,10 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan.serde; +package org.apache.pinot.query.routing; +import com.google.common.collect.Maps; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -27,11 +29,6 @@ import org.apache.pinot.common.proto.Plan; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils; -import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; -import org.apache.pinot.query.routing.WorkerMetadata; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.StagePlan; /** @@ -46,30 +43,35 @@ public class QueryPlanSerDeUtils { AbstractPlanNode rootNode = StageNodeSerDeUtils.deserializeStageNode(Plan.StageNode.parseFrom(protoStagePlan.getRootNode())); StageMetadata stageMetadata = fromProtoStageMetadata(protoStagePlan.getStageMetadata()); - return new StagePlan(protoStagePlan.getStageId(), rootNode, stageMetadata); + return new StagePlan(rootNode, stageMetadata); } private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) throws InvalidProtocolBufferException { - List<WorkerMetadata> workerMetadataList = - protoStageMetadata.getWorkerMetadataList().stream().map(QueryPlanSerDeUtils::fromProtoWorkerMetadata) - .collect(Collectors.toList()); + List<Worker.WorkerMetadata> protoWorkerMetadataList = protoStageMetadata.getWorkerMetadataList(); + List<WorkerMetadata> workerMetadataList = new ArrayList<>(protoWorkerMetadataList.size()); + for (Worker.WorkerMetadata protoWorkerMetadata : protoWorkerMetadataList) { + workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata)); + } Map<String, String> customProperties = fromProtoProperties(protoStageMetadata.getCustomProperty()); - return new StageMetadata(workerMetadataList, customProperties); + return new StageMetadata(protoStageMetadata.getStageId(), workerMetadataList, customProperties); } - private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) { - VirtualServerAddress virtualAddress = VirtualServerAddress.parse(protoWorkerMetadata.getVirtualAddress()); - Map<Integer, MailboxMetadata> mailboxMetadataMap = protoWorkerMetadata.getMailboxMetadataMap().entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> fromProtoMailbox(e.getValue()))); - return new WorkerMetadata(virtualAddress, mailboxMetadataMap, protoWorkerMetadata.getCustomPropertyMap()); + private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) + throws InvalidProtocolBufferException { + Map<Integer, ByteString> protoMailboxInfosMap = protoWorkerMetadata.getMailboxInfosMap(); + Map<Integer, MailboxInfos> mailboxInfosMap = Maps.newHashMapWithExpectedSize(protoMailboxInfosMap.size()); + for (Map.Entry<Integer, ByteString> entry : protoMailboxInfosMap.entrySet()) { + mailboxInfosMap.put(entry.getKey(), fromProtoMailboxInfos(entry.getValue())); + } + return new WorkerMetadata(protoWorkerMetadata.getWorkedId(), mailboxInfosMap, + protoWorkerMetadata.getCustomPropertyMap()); } - private static MailboxMetadata fromProtoMailbox(Worker.MailboxMetadata protoMailboxMetadata) { - List<VirtualServerAddress> virtualAddresses = - protoMailboxMetadata.getVirtualAddressList().stream().map(VirtualServerAddress::parse) - .collect(Collectors.toList()); - return new MailboxMetadata(protoMailboxMetadata.getMailboxIdList(), virtualAddresses); + private static MailboxInfos fromProtoMailboxInfos(ByteString protoMailboxInfos) + throws InvalidProtocolBufferException { + return new MailboxInfos(Worker.MailboxInfos.parseFrom(protoMailboxInfos).getMailboxInfoList().stream() + .map(v -> new MailboxInfo(v.getHostname(), v.getPort(), v.getWorkerIdList())).collect(Collectors.toList())); } public static Map<String, String> fromProtoProperties(ByteString protoProperties) @@ -82,19 +84,17 @@ public class QueryPlanSerDeUtils { } private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) { - Map<Integer, Worker.MailboxMetadata> protoMailboxMetadataMap = - workerMetadata.getMailboxMetadataMap().entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> toProtoMailboxMetadata(e.getValue()))); - return Worker.WorkerMetadata.newBuilder().setVirtualAddress(workerMetadata.getVirtualAddress().toString()) - .putAllMailboxMetadata(protoMailboxMetadataMap).putAllCustomProperty(workerMetadata.getCustomProperties()) - .build(); + Map<Integer, ByteString> mailboxInfosMap = workerMetadata.getMailboxInfosMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toProtoBytes())); + return Worker.WorkerMetadata.newBuilder().setWorkedId(workerMetadata.getWorkerId()) + .putAllMailboxInfos(mailboxInfosMap).putAllCustomProperty(workerMetadata.getCustomProperties()).build(); } - private static Worker.MailboxMetadata toProtoMailboxMetadata(MailboxMetadata mailboxMetadata) { - List<String> virtualAddresses = - mailboxMetadata.getVirtualAddresses().stream().map(VirtualServerAddress::toString).collect(Collectors.toList()); - return Worker.MailboxMetadata.newBuilder().addAllMailboxId(mailboxMetadata.getMailboxIds()) - .addAllVirtualAddress(virtualAddresses).build(); + public static Worker.MailboxInfos toProtoMailboxInfos(List<MailboxInfo> mailboxInfos) { + List<Worker.MailboxInfo> protoMailboxInfos = mailboxInfos.stream().map( + v -> Worker.MailboxInfo.newBuilder().setHostname(v.getHostname()).setPort(v.getPort()) + .addAllWorkerId(v.getWorkerIds()).build()).collect(Collectors.toList()); + return Worker.MailboxInfos.newBuilder().addAllMailboxInfo(protoMailboxInfos).build(); } public static ByteString toProtoProperties(Map<String, String> properties) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java similarity index 50% copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java index a45b48c5de..7b92899237 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java @@ -16,36 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan; +package org.apache.pinot.query.routing; -import org.apache.pinot.query.planner.plannode.PlanNode; +public class RoutingInfo { + private final String _hostname; + private final int _port; + private final String _mailboxId; - -/** - * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}. - * - * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}. - */ -public class StagePlan { - private final int _stageId; - private final PlanNode _rootNode; - private final StageMetadata _stageMetadata; - - public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) { - _stageId = stageId; - _rootNode = rootNode; - _stageMetadata = stageMetadata; + public RoutingInfo(String hostname, int port, String mailboxId) { + _hostname = hostname; + _port = port; + _mailboxId = mailboxId; } - public int getStageId() { - return _stageId; + public String getHostname() { + return _hostname; } - public PlanNode getRootNode() { - return _rootNode; + public int getPort() { + return _port; } - public StageMetadata getStageMetadata() { - return _stageMetadata; + public String getMailboxId() { + return _mailboxId; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java similarity index 50% copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java index a45b48c5de..15fb1e666a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java @@ -16,36 +16,32 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan; +package org.apache.pinot.query.routing; -import org.apache.pinot.query.planner.plannode.PlanNode; +import com.google.protobuf.ByteString; +import java.util.List; /** - * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}. - * - * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}. + * {@code SharedMailboxInfos} is the shared version of the {@link MailboxInfos} which can cache the proto bytes and + * reduce overhead of serialization. */ -public class StagePlan { - private final int _stageId; - private final PlanNode _rootNode; - private final StageMetadata _stageMetadata; - - public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) { - _stageId = stageId; - _rootNode = rootNode; - _stageMetadata = stageMetadata; - } +public class SharedMailboxInfos extends MailboxInfos { + private ByteString _protoBytes; - public int getStageId() { - return _stageId; + public SharedMailboxInfos(List<MailboxInfo> mailboxInfos) { + super(mailboxInfos); } - public PlanNode getRootNode() { - return _rootNode; + public SharedMailboxInfos(MailboxInfo mailboxInfo) { + super(mailboxInfo); } - public StageMetadata getStageMetadata() { - return _stageMetadata; + @Override + public synchronized ByteString toProtoBytes() { + if (_protoBytes == null) { + _protoBytes = super.toProtoBytes(); + } + return _protoBytes; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java similarity index 88% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java index a07a04a0b7..6eca815d68 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java @@ -16,27 +16,32 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan; +package org.apache.pinot.query.routing; import java.util.List; import java.util.Map; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; -import org.apache.pinot.query.routing.WorkerMetadata; /** * {@code StageMetadata} is used to send plan fragment-level info about how to execute a stage physically. */ public class StageMetadata { + private final int _stageId; private final List<WorkerMetadata> _workerMetadataList; private final Map<String, String> _customProperties; - public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) { + public StageMetadata(int stageId, List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) { + _stageId = stageId; _workerMetadataList = workerMetadataList; _customProperties = customProperties; } + public int getStageId() { + return _stageId; + } + public List<WorkerMetadata> getWorkerMetadataList() { return _workerMetadataList; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java similarity index 85% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java index a45b48c5de..7dfa00268d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan; +package org.apache.pinot.query.routing; import org.apache.pinot.query.planner.plannode.PlanNode; @@ -27,20 +27,14 @@ import org.apache.pinot.query.planner.plannode.PlanNode; * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}. */ public class StagePlan { - private final int _stageId; private final PlanNode _rootNode; private final StageMetadata _stageMetadata; - public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) { - _stageId = stageId; + public StagePlan(PlanNode rootNode, StageMetadata stageMetadata) { _rootNode = rootNode; _stageMetadata = stageMetadata; } - public int getStageId() { - return _stageId; - } - public PlanNode getRootNode() { return _rootNode; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java index 3392261980..ab1226862e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java @@ -44,29 +44,29 @@ import org.apache.pinot.spi.utils.JsonUtils; public class WorkerMetadata { public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap"; - private final VirtualServerAddress _virtualAddress; - private final Map<Integer, MailboxMetadata> _mailboxMetadataMap; + private final int _workerId; + private final Map<Integer, MailboxInfos> _mailboxInfosMap; private final Map<String, String> _customProperties; - public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap) { - _virtualAddress = virtualAddress; - _mailboxMetadataMap = mailboxMetadataMap; + public WorkerMetadata(int workerId, Map<Integer, MailboxInfos> mailboxInfosMap) { + _workerId = workerId; + _mailboxInfosMap = mailboxInfosMap; _customProperties = new HashMap<>(); } - public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap, + public WorkerMetadata(int workerId, Map<Integer, MailboxInfos> mailboxInfosMap, Map<String, String> customProperties) { - _virtualAddress = virtualAddress; - _mailboxMetadataMap = mailboxMetadataMap; + _workerId = workerId; + _mailboxInfosMap = mailboxInfosMap; _customProperties = customProperties; } - public VirtualServerAddress getVirtualAddress() { - return _virtualAddress; + public int getWorkerId() { + return _workerId; } - public Map<Integer, MailboxMetadata> getMailboxMetadataMap() { - return _mailboxMetadataMap; + public Map<Integer, MailboxInfos> getMailboxInfosMap() { + return _mailboxInfosMap; } public Map<String, String> getCustomProperties() { diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java index 20d798c9c6..4bf911c805 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java @@ -19,7 +19,6 @@ package org.apache.pinot.query; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -30,7 +29,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.query.planner.PlannerUtils; -import org.apache.pinot.query.planner.explain.PhysicalExplainPlanVisitor; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.AbstractPlanNode; @@ -40,53 +38,43 @@ import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.ProjectNode; -import org.testng.Assert; +import org.apache.pinot.query.routing.QueryServerInstance; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.testng.Assert.*; + public class QueryCompilationTest extends QueryEnvironmentTestBase { @Test(dataProvider = "testQueryLogicalPlanDataProvider") - public void testQueryPlanExplainLogical(String query, String digest) - throws Exception { + public void testQueryPlanExplainLogical(String query, String digest) { testQueryPlanExplain(query, digest); } private void testQueryPlanExplain(String query, String digest) { - try { - long requestId = RANDOM_REQUEST_ID_GEN.nextLong(); - String explainedPlan = _queryEnvironment.explainQuery(query, requestId); - Assert.assertEquals(explainedPlan, digest); - } catch (RuntimeException e) { - Assert.fail("failed to explain query: " + query, e); - } + long requestId = RANDOM_REQUEST_ID_GEN.nextLong(); + String explainedPlan = _queryEnvironment.explainQuery(query, requestId); + assertEquals(explainedPlan, digest); } @Test(dataProvider = "testQueryDataProvider") - public void testQueryPlanWithoutException(String query) - throws Exception { - try { - DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); - Assert.assertNotNull(dispatchableSubPlan); - } catch (RuntimeException e) { - Assert.fail("failed to plan query: " + query, e); - } + public void testQueryPlanWithoutException(String query) { + DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); + assertNotNull(dispatchableSubPlan); } @Test(dataProvider = "testQueryExceptionDataProvider") public void testQueryWithException(String query, String exceptionSnippet) { try { _queryEnvironment.planQuery(query); - Assert.fail("query plan should throw exception"); + fail("query plan should throw exception"); } catch (RuntimeException e) { - Assert.assertTrue(e.getCause().getMessage().contains(exceptionSnippet)); + assertTrue(e.getCause().getMessage().contains(exceptionSnippet)); } } - private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite) - throws Exception { - + private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite) { for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) { if (dispatchableSubPlan.getTableNames().size() == 0 && !PlannerUtils.isRootPlanFragment(stageId)) { PlanNode node = dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(); @@ -95,17 +83,17 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { // JOIN is exchanged with hash distribution (data shuffle) MailboxReceiveNode left = (MailboxReceiveNode) node.getInputs().get(0); MailboxReceiveNode right = (MailboxReceiveNode) node.getInputs().get(1); - Assert.assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED); - Assert.assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED); + assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED); + assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED); break; } if (node instanceof AggregateNode && node.getInputs().get(0) instanceof MailboxReceiveNode) { // AGG is exchanged with singleton since it has already been distributed by JOIN. MailboxReceiveNode input = (MailboxReceiveNode) node.getInputs().get(0); if (shouldRewrite) { - Assert.assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON); + assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON); } else { - Assert.assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON); + assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON); } break; } @@ -116,34 +104,40 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { } @Test - public void testQueryAndAssertStageContentForJoin() - throws Exception { + public void testQueryAndAssertStageContentForJoin() { String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2"; DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); - Assert.assertEquals(dispatchableSubPlan.getQueryStageList().size(), 4); - - for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) { - DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId); - String tableName = dispatchablePlanFragment.getTableName(); + List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList(); + int numStages = stagePlans.size(); + assertEquals(numStages, 4); + for (int stageId = 0; stageId < numStages; stageId++) { + DispatchablePlanFragment stagePlan = stagePlans.get(stageId); + Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = stagePlan.getServerInstanceToWorkerIdMap(); + int numServers = serverToWorkerIdsMap.size(); + String tableName = stagePlan.getTableName(); if (tableName != null) { // table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1 - Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream() - .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry) - .collect(Collectors.toSet()), - tableName.equals("a") ? ImmutableList.of("localhost:1|[1]", "localhost:2|[0]") - : ImmutableList.of("localhost:1|[0]")); + if (tableName.equals("a")) { + assertEquals(numServers, 2); + for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) { + int port = server.getQueryMailboxPort(); + assertTrue(port == 1 || port == 2); + } + } else { + assertEquals(numServers, 1); + assertEquals(serverToWorkerIdsMap.keySet().iterator().next().getQueryMailboxPort(), 1); + } } else if (!PlannerUtils.isRootPlanFragment(stageId)) { // join stage should have both servers used. - Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream() - .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry) - .collect(Collectors.toSet()), - ImmutableSet.of("localhost:1|[1]", "localhost:2|[0]")); + assertEquals(numServers, 2); + for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) { + int port = server.getQueryMailboxPort(); + assertTrue(port == 1 || port == 2); + } } else { // reduce stage should have the reducer instance. - Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream() - .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry) - .collect(Collectors.toSet()), - ImmutableSet.of("localhost:3|[0]")); + assertEquals(numServers, 1); + assertEquals(serverToWorkerIdsMap.keySet().iterator().next().getQueryMailboxPort(), 3); } } } @@ -167,24 +161,27 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { public void testQueryRoutingManagerCompilation() { String query = "SELECT * FROM d_OFFLINE"; DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); - List<DispatchablePlanFragment> tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream() - .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList()); - Assert.assertEquals(tableScanMetadataList.size(), 1); - Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2); + List<DispatchablePlanFragment> tableScanMetadataList = + dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null) + .collect(Collectors.toList()); + assertEquals(tableScanMetadataList.size(), 1); + assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2); query = "SELECT * FROM d_REALTIME"; dispatchableSubPlan = _queryEnvironment.planQuery(query); - tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream() - .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList()); - Assert.assertEquals(tableScanMetadataList.size(), 1); - Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1); + tableScanMetadataList = + dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null) + .collect(Collectors.toList()); + assertEquals(tableScanMetadataList.size(), 1); + assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1); query = "SELECT * FROM d"; dispatchableSubPlan = _queryEnvironment.planQuery(query); - tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream() - .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList()); - Assert.assertEquals(tableScanMetadataList.size(), 1); - Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2); + tableScanMetadataList = + dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null) + .collect(Collectors.toList()); + assertEquals(tableScanMetadataList.size(), 1); + assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2); } // Test that plan query can be run as multi-thread. @@ -232,34 +229,36 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { } for (ArrayList<DispatchableSubPlan> plans : queryPlans.values()) { for (DispatchableSubPlan plan : plans) { - Assert.assertTrue(plan.equals(plans.get(0))); + assertTrue(plan.equals(plans.get(0))); } } } @Test - public void testQueryWithHint() - throws Exception { + public void testQueryWithHint() { // Hinting the query to use final stage aggregation makes server directly return final result // This is useful when data is already partitioned by col1 String query = "SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ col1, COUNT(*) FROM b GROUP BY col1"; DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); - Assert.assertEquals(dispatchableSubPlan.getQueryStageList().size(), 2); - for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) { - DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId); - String tableName = dispatchablePlanFragment.getTableName(); + List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList(); + int numStages = stagePlans.size(); + assertEquals(numStages, 2); + for (int stageId = 0; stageId < numStages; stageId++) { + DispatchablePlanFragment stagePlan = stagePlans.get(stageId); + Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = stagePlan.getServerInstanceToWorkerIdMap(); + int numServers = serverToWorkerIdsMap.size(); + String tableName = stagePlan.getTableName(); if (tableName != null) { // table scan stages; for tableB it should have only 1 - Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream() - .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry) - .collect(Collectors.toSet()), - ImmutableList.of("localhost:1|[0]")); + assertEquals(numServers, 1); + assertEquals(stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next().getQueryMailboxPort(), 1); } else if (!PlannerUtils.isRootPlanFragment(stageId)) { // join stage should have both servers used. - Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream() - .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry) - .collect(Collectors.toSet()), - ImmutableList.of("localhost:1|[1]", "localhost:2|[0]")); + assertEquals(numServers, 2); + for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) { + int port = server.getQueryMailboxPort(); + assertTrue(port == 1 || port == 2); + } } } } @@ -269,105 +268,103 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { // A simple filter query with one table String query = "Select * from a where col1 = 'a'"; List<String> tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 1); - Assert.assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.size(), 1); + assertEquals(tableNames.get(0), "a"); // query with IN / NOT IN clause - query = "SELECT COUNT(*) FROM a WHERE col1 IN (SELECT col1 FROM b) " - + "and col1 NOT IN (SELECT col1 from c)"; + query = "SELECT COUNT(*) FROM a WHERE col1 IN (SELECT col1 FROM b) " + "and col1 NOT IN (SELECT col1 from c)"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 3); + assertEquals(tableNames.size(), 3); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); - Assert.assertEquals(tableNames.get(2), "c"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(2), "c"); // query with JOIN clause query = "SELECT a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3 WHERE a.col1 = 'a'"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 2); + assertEquals(tableNames.size(), 2); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); // query with WHERE clause JOIN query = "SELECT a.col1, b.col2 FROM a, b WHERE a.col3 = b.col3 AND a.col1 = 'a'"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 2); + assertEquals(tableNames.size(), 2); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); // query with JOIN clause and table alias query = "SELECT A.col1, B.col2 FROM a AS A JOIN b AS B ON A.col3 = B.col3 WHERE A.col1 = 'a'"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 2); + assertEquals(tableNames.size(), 2); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); // query with UNION clause query = "SELECT * FROM a UNION ALL SELECT * FROM b UNION ALL SELECT * FROM c"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 3); + assertEquals(tableNames.size(), 3); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); - Assert.assertEquals(tableNames.get(2), "c"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(2), "c"); // query with UNION clause and table alias query = "SELECT * FROM (SELECT * FROM a) AS t1 UNION SELECT * FROM ( SELECT * FROM b) AS t2"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 2); + assertEquals(tableNames.size(), 2); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); // query with UNION clause and table alias using WITH clause - query = "WITH tmp1 AS (SELECT * FROM a), \n" - + "tmp2 AS (SELECT * FROM b) \n" + query = "WITH tmp1 AS (SELECT * FROM a), \n" + "tmp2 AS (SELECT * FROM b) \n" + "SELECT * FROM tmp1 UNION ALL SELECT * FROM tmp2"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 2); + assertEquals(tableNames.size(), 2); Collections.sort(tableNames); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); // query with aliases, JOIN, IN/NOT-IN, group-by query = "with tmp as (select col1, sum(col3) as col3, count(*) from a where col1 = 'a' group by col1), " + "tmp2 as (select A.col1, B.col3 from b as A JOIN c AS B on A.col1 = B.col1) " + "select sum(col3) from tmp where col1 in (select col1 from tmp2) and col1 not in (select col1 from d)"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 4); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); - Assert.assertEquals(tableNames.get(2), "c"); - Assert.assertEquals(tableNames.get(3), "d"); + assertEquals(tableNames.size(), 4); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(2), "c"); + assertEquals(tableNames.get(3), "d"); // query with aliases, JOIN, IN/NOT-IN, group-by and explain query = "explain plan for with tmp as (select col1, sum(col3) as col3, count(*) from a where col1 = 'a' " + "group by col1), tmp2 as (select A.col1, B.col3 from b as A JOIN c AS B on A.col1 = B.col1) " + "select sum(col3) from tmp where col1 in (select col1 from tmp2) and col1 not in (select col1 from d)"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 4); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); - Assert.assertEquals(tableNames.get(2), "c"); - Assert.assertEquals(tableNames.get(3), "d"); + assertEquals(tableNames.size(), 4); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.get(2), "c"); + assertEquals(tableNames.get(3), "d"); // lateral join query query = "EXPLAIN PLAN FOR SELECT a.col1, newb.sum_col3 FROM a JOIN LATERAL " + "(SELECT SUM(col3) as sum_col3 FROM b WHERE col2 = a.col2) AS newb ON TRUE"; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 2); - Assert.assertEquals(tableNames.get(0), "a"); - Assert.assertEquals(tableNames.get(1), "b"); + assertEquals(tableNames.size(), 2); + assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.get(1), "b"); // test for self join queries query = "SELECT a.col1 FROM a JOIN(SELECT col2 FROM a) as self ON a.col1=self.col2 "; tableNames = _queryEnvironment.getTableNamesForQuery(query); - Assert.assertEquals(tableNames.size(), 1); - Assert.assertEquals(tableNames.get(0), "a"); + assertEquals(tableNames.size(), 1); + assertEquals(tableNames.get(0), "a"); } // -------------------------------------------------------------------------- @@ -375,7 +372,7 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { // -------------------------------------------------------------------------- private static void assertNodeTypeNotIn(PlanNode node, List<Class<? extends AbstractPlanNode>> bannedNodeType) { - Assert.assertFalse(isOneOf(bannedNodeType, node)); + assertFalse(isOneOf(bannedNodeType, node)); for (PlanNode child : node.getInputs()) { assertNodeTypeNotIn(child, bannedNodeType); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 4796383a0b..fe078e87c6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -34,8 +34,10 @@ import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.physical.MailboxIdUtils; import org.apache.pinot.query.planner.plannode.MailboxSendNode; -import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.RoutingInfo; +import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils; @@ -43,8 +45,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.StagePlan; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils; @@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory; public class QueryRunner { private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class); + private String _hostname; + private int _port; private HelixManager _helixManager; private ServerMetrics _serverMetrics; @@ -88,15 +90,16 @@ public class QueryRunner { */ public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager, ServerMetrics serverMetrics) { - _helixManager = helixManager; - _serverMetrics = serverMetrics; - String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) { hostname = hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH); } int port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT); + _hostname = hostname; + _port = port; + _helixManager = helixManager; + _serverMetrics = serverMetrics; // TODO: Consider using separate config for intermediate stage and leaf stage String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT); @@ -163,25 +166,25 @@ public class QueryRunner { // Send error block to all the receivers if pipeline breaker fails if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); + int stageId = stageMetadata.getStageId(); LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, - stagePlan.getStageId(), errorBlock.getExceptions()); + stageId, errorBlock.getExceptions()); int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId(); - MailboxMetadata mailboxMetadata = workerMetadata.getMailboxMetadataMap().get(receiverStageId); - List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata); - List<VirtualServerAddress> virtualAddresses = mailboxMetadata.getVirtualAddresses(); - int numMailboxes = mailboxIds.size(); - for (int i = 0; i < numMailboxes; i++) { - String mailboxId = mailboxIds.get(i); - VirtualServerAddress virtualAddress = virtualAddresses.get(i); + List<MailboxInfo> receiverMailboxInfos = + workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List<RoutingInfo> routingInfos = + MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, + receiverMailboxInfos); + for (RoutingInfo routingInfo : routingInfos) { try { - _mailboxService.getSendingMailbox(virtualAddress.hostname(), virtualAddress.port(), mailboxId, deadlineMs) - .send(errorBlock); + _mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(), + routingInfo.getMailboxId(), deadlineMs).send(errorBlock); } catch (TimeoutException e) { - LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxId, requestId, - stagePlan.getStageId(), e); + LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", + routingInfo.getMailboxId(), requestId, stageId, e); } catch (Exception e) { - LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", mailboxId, - requestId, stagePlan.getStageId(), e); + LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", + routingInfo.getMailboxId(), requestId, stageId, e); } } return; @@ -189,8 +192,8 @@ public class QueryRunner { // run OpChain OpChainExecutionContext executionContext = - new OpChainExecutionContext(_mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata, - stageMetadata, workerMetadata, pipelineBreakerResult); + new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata, + workerMetadata, pipelineBreakerResult); OpChain opChain; if (workerMetadata.isLeafStageWorker()) { opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java index 37903c2f72..808caba04e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java @@ -27,7 +27,7 @@ import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.physical.MailboxIdUtils; -import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.routing.MailboxInfos; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.operator.utils.AsyncStream; import org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer; @@ -58,9 +58,11 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator { _exchangeType = exchangeType; long requestId = context.getRequestId(); - MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(senderStageId); - if (mailboxMetadata != null && !mailboxMetadata.getMailboxIds().isEmpty()) { - _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata); + MailboxInfos mailboxInfos = context.getWorkerMetadata().getMailboxInfosMap().get(senderStageId); + if (mailboxInfos != null) { + _mailboxIds = + MailboxIdUtils.toMailboxIds(requestId, senderStageId, mailboxInfos.getMailboxInfos(), context.getStageId(), + context.getWorkerId()); } else { _mailboxIds = Collections.emptyList(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index f74feb1cac..5d66d6afac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -20,11 +20,11 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelFieldCollation; @@ -32,8 +32,8 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.physical.MailboxIdUtils; -import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.RoutingInfo; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; @@ -91,18 +91,14 @@ public class MailboxSendOperator extends MultiStageOperator { long requestId = context.getRequestId(); long deadlineMs = context.getDeadlineMs(); - MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(receiverStageId); - List<String> sendingMailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata); - List<VirtualServerAddress> sendingAddresses = mailboxMetadata.getVirtualAddresses(); - int numMailboxes = sendingMailboxIds.size(); - List<SendingMailbox> sendingMailboxes = new ArrayList<>(numMailboxes); - for (int i = 0; i < numMailboxes; i++) { - String sendingMailboxId = sendingMailboxIds.get(i); - VirtualServerAddress sendingAddress = sendingAddresses.get(i); - sendingMailboxes.add( - mailboxService.getSendingMailbox(sendingAddress.hostname(), sendingAddress.port(), sendingMailboxId, - deadlineMs)); - } + List<MailboxInfo> mailboxInfos = + context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List<RoutingInfo> routingInfos = + MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId, + mailboxInfos); + List<SendingMailbox> sendingMailboxes = routingInfos.stream() + .map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs)) + .collect(Collectors.toList()); return BlockExchange.getExchange(sendingMailboxes, distributionType, distributionKeys, TransferableBlockUtils::splitBlock); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index 5059b2f8ec..51d61e5dbc 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.plan; import java.util.Collections; import java.util.Map; import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.operator.OpChainId; @@ -38,11 +39,11 @@ import org.apache.pinot.spi.utils.CommonConstants; public class OpChainExecutionContext { private final MailboxService _mailboxService; private final long _requestId; - private final int _stageId; private final long _deadlineMs; private final Map<String, String> _opChainMetadata; private final StageMetadata _stageMetadata; private final WorkerMetadata _workerMetadata; + private final VirtualServerAddress _server; private final OpChainId _id; private final OpChainStats _stats; private final PipelineBreakerResult _pipelineBreakerResult; @@ -50,17 +51,18 @@ public class OpChainExecutionContext { private ServerPlanRequestContext _leafStageContext; - public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs, + public OpChainExecutionContext(MailboxService mailboxService, long requestId, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata, PipelineBreakerResult pipelineBreakerResult) { _mailboxService = mailboxService; _requestId = requestId; - _stageId = stageId; _deadlineMs = deadlineMs; _opChainMetadata = Collections.unmodifiableMap(opChainMetadata); _stageMetadata = stageMetadata; _workerMetadata = workerMetadata; - _id = new OpChainId(requestId, workerMetadata.getVirtualAddress().workerId(), stageId); + _server = + new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), workerMetadata.getWorkerId()); + _id = new OpChainId(requestId, workerMetadata.getWorkerId(), stageMetadata.getStageId()); _stats = new OpChainStats(_id.toString()); _pipelineBreakerResult = pipelineBreakerResult; if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) { @@ -78,11 +80,15 @@ public class OpChainExecutionContext { } public int getStageId() { - return _stageId; + return _stageMetadata.getStageId(); + } + + public int getWorkerId() { + return _workerMetadata.getWorkerId(); } public VirtualServerAddress getServer() { - return _workerMetadata.getVirtualAddress(); + return _server; } public long getDeadlineMs() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index aec7998e16..a033df03d7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -29,6 +29,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -36,7 +37,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; -import org.apache.pinot.query.runtime.plan.StagePlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,12 +77,12 @@ public class PipelineBreakerExecutor { // OpChain receive-mail callbacks. // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information OpChainExecutionContext opChainExecutionContext = - new OpChainExecutionContext(mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata, + new OpChainExecutionContext(mailboxService, requestId, deadlineMs, opChainMetadata, stagePlan.getStageMetadata(), workerMetadata, null); return execute(scheduler, pipelineBreakerContext, opChainExecutionContext); } catch (Exception e) { LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId, - stagePlan.getStageId(), e); + stagePlan.getStageMetadata().getStageId(), e); return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), Collections.emptyMap(), TransferableBlockUtils.getErrorTransferableBlock(e), null); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java index 3c03fa1539..cc9d131962 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -24,7 +24,7 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.runtime.plan.StagePlan; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 4c504f71d6..9b8d19d77d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -47,11 +47,11 @@ import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.StagePlan; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 06df28f561..f217c9b01a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -54,7 +54,9 @@ import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils; +import org.apache.pinot.query.routing.QueryPlanSerDeUtils; import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; @@ -63,8 +65,6 @@ import org.apache.pinot.query.runtime.operator.OpChainStats; import org.apache.pinot.query.runtime.operator.OperatorStats; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; @@ -170,11 +170,11 @@ public class QueryDispatcher { QueryPlanSerDeUtils.toProtoWorkerMetadataList(workerMetadataList); StageInfo stageInfo = stageInfos.get(i); Worker.StageMetadata stageMetadata = - Worker.StageMetadata.newBuilder().addAllWorkerMetadata(protoWorkerMetadataList) + Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(protoWorkerMetadataList) .setCustomProperty(stageInfo._customProperty).build(); requestBuilder.addStagePlan( - Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(stageInfo._rootNode) - .setStageMetadata(stageMetadata).build()); + Worker.StagePlan.newBuilder().setRootNode(stageInfo._rootNode).setStageMetadata(stageMetadata) + .build()); } } requestBuilder.setMetadata(protoRequestMetadata); @@ -264,10 +264,10 @@ public class QueryDispatcher { List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList(); Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s", workerMetadataList.size()); - StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties()); + StageMetadata stageMetadata = new StageMetadata(0, workerMetadataList, dispatchableStagePlan.getCustomProperties()); OpChainExecutionContext opChainExecutionContext = - new OpChainExecutionContext(mailboxService, requestId, planFragment.getFragmentId(), - System.currentTimeMillis() + timeoutMs, queryOptions, stageMetadata, workerMetadataList.get(0), null); + new OpChainExecutionContext(mailboxService, requestId, System.currentTimeMillis() + timeoutMs, queryOptions, + stageMetadata, workerMetadataList.get(0), null); MailboxReceiveOperator receiveOperator = new MailboxReceiveOperator(opChainExecutionContext, receiveNode.getDistributionType(), receiveNode.getSenderStageId()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java index 2e52c28a5a..c8caed9100 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java @@ -31,11 +31,11 @@ import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.query.routing.QueryPlanSerDeUtils; +import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.QueryRunner; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.StagePlan; -import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; @@ -124,7 +124,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", requestId, - protoStagePlan.getStageId()), e); + protoStagePlan.getStageMetadata().getStageId()), e); } StageMetadata stageMetadata = stagePlan.getStageMetadata(); List<WorkerMetadata> workerMetadataList = stageMetadata.getWorkerMetadataList(); @@ -142,7 +142,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while submitting request: %d, stage: %d", requestId, - protoStagePlan.getStageId()), e); + stageMetadata.getStageId()), e); } finally { for (CompletableFuture<?> future : workerSubmissionStubs) { if (!future.isDone()) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 1811218eda..3de9cc6fdb 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -27,9 +27,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.QueryRunner; -import org.apache.pinot.query.runtime.plan.StagePlan; import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.data.Schema; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index c34c858e76..7a35202529 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -25,13 +25,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.utils.NamedThreadFactory; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -41,6 +41,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OpChainSchedulerServiceTest { @@ -70,10 +72,13 @@ public class OpChainSchedulerServiceTest { } private OpChain getChain(MultiStageOperator operator) { - WorkerMetadata workerMetadata = - new WorkerMetadata(new VirtualServerAddress("localhost", 123, 0), ImmutableMap.of(), ImmutableMap.of()); - OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, Long.MAX_VALUE, ImmutableMap.of(), - new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null); + MailboxService mailboxService = mock(MailboxService.class); + when(mailboxService.getHostname()).thenReturn("localhost"); + when(mailboxService.getPort()).thenReturn(1234); + WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of()); + OpChainExecutionContext context = + new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(), + new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null); return new OpChain(context, operator); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index 1a2949b142..2868f6526c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -29,13 +29,14 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.physical.MailboxIdUtils; -import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.MailboxInfos; +import org.apache.pinot.query.routing.SharedMailboxInfos; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.AfterMethod; @@ -70,25 +71,21 @@ public class MailboxReceiveOperatorTest { @BeforeClass public void setUp() { - VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0); - VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1); - _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0, - new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2)), 1, new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of()); - _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))), - ImmutableMap.of())), ImmutableMap.of()); + MailboxInfos mailboxInfosBoth = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0, 1))); + _stageMetadataBoth = new StageMetadata(0, Stream.of(0, 1) + .map(workerId -> new WorkerMetadata(workerId, ImmutableMap.of(1, mailboxInfosBoth), ImmutableMap.of())) + .collect(Collectors.toList()), ImmutableMap.of()); + MailboxInfos mailboxInfos1 = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0))); + _stageMetadata1 = new StageMetadata(0, + ImmutableList.of(new WorkerMetadata(0, ImmutableMap.of(1, mailboxInfos1), ImmutableMap.of())), + ImmutableMap.of()); } @BeforeMethod public void setUpMethod() { _mocks = MockitoAnnotations.openMocks(this); when(_mailboxService.getHostname()).thenReturn("localhost"); - when(_mailboxService.getPort()).thenReturn(123); + when(_mailboxService.getPort()).thenReturn(1234); } @AfterMethod diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 86b2ac0000..92f7b2ce7a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -25,13 +25,12 @@ import java.util.Map; import java.util.concurrent.TimeoutException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.testng.annotations.AfterMethod; @@ -52,20 +51,17 @@ public class MailboxSendOperatorTest { private AutoCloseable _mocks; @Mock - private VirtualServerAddress _server; + private MailboxService _mailboxService; @Mock private MultiStageOperator _sourceOperator; @Mock - private MailboxService _mailboxService; - @Mock private BlockExchange _exchange; @BeforeMethod public void setUpMethod() { _mocks = openMocks(this); - when(_server.hostname()).thenReturn("mock"); - when(_server.port()).thenReturn(0); - when(_server.workerId()).thenReturn(0); + when(_mailboxService.getHostname()).thenReturn("localhost"); + when(_mailboxService.getPort()).thenReturn(1234); } @AfterMethod @@ -198,11 +194,12 @@ public class MailboxSendOperatorTest { } private MailboxSendOperator getMailboxSendOperator() { - WorkerMetadata workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(), ImmutableMap.of()); - StageMetadata stageMetadata = new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()); + WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of()); + StageMetadata stageMetadata = + new StageMetadata(SENDER_STAGE_ID, ImmutableList.of(workerMetadata), ImmutableMap.of()); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, Long.MAX_VALUE, ImmutableMap.of(), - stageMetadata, workerMetadata, null); + new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(), stageMetadata, + workerMetadata, null); return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index a74dec4e6f..f4cd38a42e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -44,15 +44,15 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.planner.physical.MailboxIdUtils; -import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.MailboxInfos; +import org.apache.pinot.query.routing.SharedMailboxInfos; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.spi.utils.CommonConstants; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -76,15 +76,12 @@ public class OpChainTest { private final List<TransferableBlock> _blockList = new ArrayList<>(); private final ExecutorService _executor = Executors.newCachedThreadPool(); private final AtomicReference<LeafStageTransferableBlockOperator> _leafOpRef = new AtomicReference<>(); - private final VirtualServerAddress _serverAddress = new VirtualServerAddress("localhost", 123, 0); - private final WorkerMetadata _workerMetadata = new WorkerMetadata(_serverAddress, ImmutableMap.of(0, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), - ImmutableList.of(_serverAddress)), 1, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), - ImmutableList.of(_serverAddress)), 2, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), - ImmutableList.of(_serverAddress))), ImmutableMap.of()); - private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of()); + private final MailboxInfos _mailboxInfos = + new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0))); + private final WorkerMetadata _workerMetadata = + new WorkerMetadata(0, ImmutableMap.of(0, _mailboxInfos, 1, _mailboxInfos, 2, _mailboxInfos), ImmutableMap.of()); + private final StageMetadata _stageMetadata = + new StageMetadata(0, ImmutableList.of(_workerMetadata), ImmutableMap.of()); private AutoCloseable _mocks; @Mock @@ -199,13 +196,9 @@ public class OpChainTest { public void testStatsCollectionTracingEnabledMultipleOperators() { long dummyOperatorWaitTime = 1000L; - int receivedStageId = 2; - int senderStageId = 1; - OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE, + OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 123L, Long.MAX_VALUE, ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null); - - Stack<MultiStageOperator> operators = - getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); + Stack<MultiStageOperator> operators = getFullOpChain(context, dummyOperatorWaitTime); OpChain opChain = new OpChain(context, operators.peek()); opChain.getStats().executing(); @@ -214,12 +207,10 @@ public class OpChainTest { } opChain.getStats().queued(); - OpChainExecutionContext secondStageContext = - new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE, - ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null); - + OpChainExecutionContext secondStageContext = new OpChainExecutionContext(_mailboxService2, 123L, Long.MAX_VALUE, + ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null); MailboxReceiveOperator secondStageReceiveOp = - new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1); + new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1); assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime); int numOperators = operators.size(); @@ -238,14 +229,10 @@ public class OpChainTest { public void testStatsCollectionTracingDisableMultipleOperators() { long dummyOperatorWaitTime = 1000L; - int receivedStageId = 2; - int senderStageId = 1; OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE, ImmutableMap.of(), - _stageMetadata, _workerMetadata, null); - - Stack<MultiStageOperator> operators = - getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); + new OpChainExecutionContext(_mailboxService1, 123L, Long.MAX_VALUE, ImmutableMap.of(), _stageMetadata, + _workerMetadata, null); + Stack<MultiStageOperator> operators = getFullOpChain(context, dummyOperatorWaitTime); OpChain opChain = new OpChain(context, operators.peek()); opChain.getStats().executing(); @@ -253,10 +240,10 @@ public class OpChainTest { opChain.getStats().queued(); OpChainExecutionContext secondStageContext = - new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE, ImmutableMap.of(), - _stageMetadata, _workerMetadata, null); + new OpChainExecutionContext(_mailboxService2, 123L, Long.MAX_VALUE, ImmutableMap.of(), _stageMetadata, + _workerMetadata, null); MailboxReceiveOperator secondStageReceiveOp = - new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId); + new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1); assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime); assertEquals(opChain.getStats().getOperatorStatsMap().size(), 2); @@ -275,8 +262,7 @@ public class OpChainTest { assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), 2); } - private Stack<MultiStageOperator> getFullOpchain(int receivedStageId, int senderStageId, - OpChainExecutionContext context, long waitTimeInMillis) { + private Stack<MultiStageOperator> getFullOpChain(OpChainExecutionContext context, long waitTimeInMillis) { Stack<MultiStageOperator> operators = new Stack<>(); DataSchema upStreamSchema = new DataSchema(new String[]{"intCol"}, new ColumnDataType[]{ColumnDataType.INT}); //Mailbox Receive Operator diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 3c132269c7..5280724541 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -26,15 +26,18 @@ import java.util.Map; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory; import org.apache.pinot.spi.utils.CommonConstants; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class OperatorTestUtil { // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java. @@ -78,7 +81,7 @@ public class OperatorTestUtil { public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, long deadlineMs, StageMetadata stageMetadata) { - return new OpChainExecutionContext(mailboxService, 0, 0, deadlineMs, ImmutableMap.of(), stageMetadata, + return new OpChainExecutionContext(mailboxService, 0, deadlineMs, ImmutableMap.of(), stageMetadata, stageMetadata.getWorkerMetadataList().get(0), null); } @@ -91,9 +94,11 @@ public class OperatorTestUtil { } private static OpChainExecutionContext getDefaultContext(Map<String, String> opChainMetadata) { - WorkerMetadata workerMetadata = - new WorkerMetadata(new VirtualServerAddress("mock", 80, 0), ImmutableMap.of(), ImmutableMap.of()); - return new OpChainExecutionContext(null, 1, 2, Long.MAX_VALUE, opChainMetadata, - new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null); + MailboxService mailboxService = mock(MailboxService.class); + when(mailboxService.getHostname()).thenReturn("localhost"); + when(mailboxService.getPort()).thenReturn(1234); + WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of()); + return new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, opChainMetadata, + new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java index 1e71018215..314081588f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java @@ -34,13 +34,14 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.physical.MailboxIdUtils; -import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.MailboxInfos; +import org.apache.pinot.query.routing.SharedMailboxInfos; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.AfterMethod; @@ -78,25 +79,21 @@ public class SortedMailboxReceiveOperatorTest { @BeforeClass public void setUp() { - VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0); - VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1); - _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0, - new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2)), 1, new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)), - ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of()); - _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))), - ImmutableMap.of())), ImmutableMap.of()); + MailboxInfos mailboxInfosBoth = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0, 1))); + _stageMetadataBoth = new StageMetadata(0, Stream.of(0, 1) + .map(workerId -> new WorkerMetadata(workerId, ImmutableMap.of(1, mailboxInfosBoth), ImmutableMap.of())) + .collect(Collectors.toList()), ImmutableMap.of()); + MailboxInfos mailboxInfos1 = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0))); + _stageMetadata1 = new StageMetadata(0, + ImmutableList.of(new WorkerMetadata(0, ImmutableMap.of(1, mailboxInfos1), ImmutableMap.of())), + ImmutableMap.of()); } @BeforeMethod public void setUpMethod() { _mocks = MockitoAnnotations.openMocks(this); when(_mailboxService.getHostname()).thenReturn("localhost"); - when(_mailboxService.getPort()).thenReturn(123); + when(_mailboxService.getPort()).thenReturn(1234); } @AfterMethod diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index 94d5e2b873..58dcd2106e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -34,7 +34,11 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.physical.MailboxIdUtils; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; -import org.apache.pinot.query.routing.MailboxMetadata; +import org.apache.pinot.query.routing.MailboxInfo; +import org.apache.pinot.query.routing.MailboxInfos; +import org.apache.pinot.query.routing.SharedMailboxInfos; +import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -42,8 +46,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils; import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.OperatorTestUtil; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.StagePlan; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -65,13 +67,12 @@ public class PipelineBreakerExecutorTest { private final VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0); private final ExecutorService _executor = Executors.newCachedThreadPool(); private final OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor); - private final WorkerMetadata _workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(0, new MailboxMetadata( - ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), - ImmutableList.of(_server, _server)), 1, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(_server)), 2, - new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(_server))), - ImmutableMap.of()); - private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of()); + private final MailboxInfos _mailboxInfos = + new SharedMailboxInfos(new MailboxInfo("localhost", 123, ImmutableList.of(0))); + private final WorkerMetadata _workerMetadata = + new WorkerMetadata(0, ImmutableMap.of(1, _mailboxInfos, 2, _mailboxInfos), ImmutableMap.of()); + private final StageMetadata _stageMetadata = + new StageMetadata(0, ImmutableList.of(_workerMetadata), ImmutableMap.of()); private AutoCloseable _mocks; @Mock @@ -107,7 +108,7 @@ public class PipelineBreakerExecutorTest { MailboxReceiveNode mailboxReceiveNode = new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); - StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata); + StagePlan stagePlan = new StagePlan(mailboxReceiveNode, _stageMetadata); // when when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); @@ -145,7 +146,7 @@ public class PipelineBreakerExecutorTest { new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of()); joinNode.addInput(mailboxReceiveNode1); joinNode.addInput(mailboxReceiveNode2); - StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata); + StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata); // when when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); @@ -181,7 +182,7 @@ public class PipelineBreakerExecutorTest { MailboxReceiveNode incorrectlyConfiguredMailboxNode = new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); - StagePlan stagePlan = new StagePlan(0, incorrectlyConfiguredMailboxNode, _stageMetadata); + StagePlan stagePlan = new StagePlan(incorrectlyConfiguredMailboxNode, _stageMetadata); // when PipelineBreakerResult pipelineBreakerResult = @@ -204,7 +205,7 @@ public class PipelineBreakerExecutorTest { MailboxReceiveNode mailboxReceiveNode = new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); - StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata); + StagePlan stagePlan = new StagePlan(mailboxReceiveNode, _stageMetadata); // when when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); @@ -240,7 +241,7 @@ public class PipelineBreakerExecutorTest { new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of()); joinNode.addInput(mailboxReceiveNode1); joinNode.addInput(incorrectlyConfiguredMailboxNode); - StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata); + StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata); // when when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); @@ -278,7 +279,7 @@ public class PipelineBreakerExecutorTest { new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of()); joinNode.addInput(mailboxReceiveNode1); joinNode.addInput(incorrectlyConfiguredMailboxNode); - StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata); + StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata); // when when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index f1315aa1bf..298af4deaf 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -58,9 +58,9 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.WorkerMetadata; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.StagePlan; import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; @@ -162,9 +162,9 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey()); List<WorkerMetadata> workerMetadataList = entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList()); - StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties()); - StagePlan stagePlan = - new StagePlan(stageId, dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata); + StageMetadata stageMetadata = + new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties()); + StagePlan stagePlan = new StagePlan(dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata); for (WorkerMetadata workerMetadata : workerMetadataList) { submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap)); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java index 679f46c60c..2595d48617 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java @@ -42,11 +42,11 @@ import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils; +import org.apache.pinot.query.routing.QueryPlanSerDeUtils; import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.QueryRunner; -import org.apache.pinot.query.runtime.plan.StageMetadata; -import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.EqualityUtils; @@ -135,7 +135,8 @@ public class QueryServerTest extends QueryTestSet { DispatchablePlanFragment dispatchableStagePlan = stagePlans.get(stageId); List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList(); - StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties()); + StageMetadata stageMetadata = + new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties()); // ensure mock query runner received correctly deserialized payload. QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT))); @@ -190,8 +191,8 @@ public class QueryServerTest extends QueryTestSet { } private static boolean isWorkerMetadataEqual(WorkerMetadata expected, WorkerMetadata actual) { - return expected.getVirtualAddress().equals(actual.getVirtualAddress()) && EqualityUtils.isEqual( - expected.getTableSegmentsMap(), actual.getTableSegmentsMap()); + return expected.getWorkerId() == actual.getWorkerId() && EqualityUtils.isEqual(expected.getTableSegmentsMap(), + actual.getTableSegmentsMap()); } private static boolean isStageNodesEqual(PlanNode left, PlanNode right) { @@ -235,11 +236,10 @@ public class QueryServerTest extends QueryTestSet { // as it is not testing the multi-tenancy dispatch (which is in the QueryDispatcherTest) QueryServerInstance serverInstance = stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next(); Worker.StageMetadata stageMetadata = - Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList).setCustomProperty(customProperty) - .build(); + Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(workerMetadataList) + .setCustomProperty(customProperty).build(); Worker.StagePlan protoStagePlan = - Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(rootNode.toByteString()) - .setStageMetadata(stageMetadata).build(); + Worker.StagePlan.newBuilder().setRootNode(rootNode.toByteString()).setStageMetadata(stageMetadata).build(); Map<String, String> requestMetadata = new HashMap<>(); // the default configurations that must exist. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org