This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 43eb011c12 In DispatchablePlanMetadata, store worker id to server instance map (#11256) 43eb011c12 is described below commit 43eb011c125a0d35bf1204fb8794293f66633870 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Aug 3 12:05:36 2023 -0700 In DispatchablePlanMetadata, store worker id to server instance map (#11256) --- .../MultiStageBrokerRequestHandler.java | 6 +- .../pinot/query/planner/DispatchableSubPlan.java | 8 +- .../planner/physical/DispatchablePlanContext.java | 57 +++++----- .../planner/physical/DispatchablePlanMetadata.java | 63 +++-------- .../planner/physical/MailboxAssignmentVisitor.java | 125 ++++++++++----------- .../planner/physical/PinotDispatchPlanner.java | 18 +-- .../colocated/GreedyShuffleRewriteVisitor.java | 43 ++++--- .../apache/pinot/query/routing/WorkerManager.java | 55 ++++----- .../apache/pinot/query/QueryCompilationTest.java | 58 +++++----- 9 files changed, 192 insertions(+), 241 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 8bbf955015..f1fb0d5b94 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -20,7 +20,6 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -245,10 +244,9 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { brokerResponse.setResultTable(queryResults); dispatchableSubPlan.getTableToUnavailableSegmentsMap().forEach( - (table, segmentList) -> brokerResponse.addToExceptions( + (tableName, unavailableSegments) -> brokerResponse.addToExceptions( new QueryProcessingException(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE, - String.format("Some segments are unavailable for table %s, unavailable segments: [%s]", table, - Arrays.toString(segmentList.toArray()))))); + String.format("Find unavailable segments: %s for table: %s", unavailableSegments, tableName)))); for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) { if (entry.getKey() == 0) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java index 4f99820351..748c3ac362 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.planner; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,11 +41,10 @@ public class DispatchableSubPlan { private final List<Pair<Integer, String>> _queryResultFields; private final List<DispatchablePlanFragment> _queryStageList; private final Set<String> _tableNames; - - private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap; + private final Map<String, Set<String>> _tableToUnavailableSegmentsMap; public DispatchableSubPlan(List<Pair<Integer, String>> fields, List<DispatchablePlanFragment> queryStageList, - Set<String> tableNames, Map<String, Collection<String>> tableToUnavailableSegmentsMap) { + Set<String> tableNames, Map<String, Set<String>> tableToUnavailableSegmentsMap) { _queryResultFields = fields; _queryStageList = queryStageList; _tableNames = tableNames; @@ -81,7 +79,7 @@ public class DispatchableSubPlan { * Get the table to unavailable segments map * @return table to unavailable segments map */ - public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() { + public Map<String, Set<String>> getTableToUnavailableSegmentsMap() { return _tableToUnavailableSegmentsMap; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java index c0fb98d9a5..4699014ad0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.planner.physical; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.planner.DispatchablePlanFragment; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.routing.MailboxMetadata; import org.apache.pinot.query.routing.QueryServerInstance; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerManager; @@ -90,44 +92,49 @@ public class DispatchablePlanContext { DispatchablePlanFragment[] dispatchablePlanFragmentArray = new DispatchablePlanFragment[_dispatchablePlanStageRootMap.size()]; createDispatchablePlanFragmentList(dispatchablePlanFragmentArray, subPlanRoot); - List<DispatchablePlanFragment> dispatchablePlanFragmentList = Arrays.asList(dispatchablePlanFragmentArray); - for (Map.Entry<Integer, DispatchablePlanMetadata> dispatchableEntry : _dispatchablePlanMetadataMap.entrySet()) { - DispatchablePlanMetadata dispatchablePlanMetadata = dispatchableEntry.getValue(); + for (Map.Entry<Integer, DispatchablePlanMetadata> planMetadataEntry : _dispatchablePlanMetadataMap.entrySet()) { + int stageId = planMetadataEntry.getKey(); + DispatchablePlanMetadata dispatchablePlanMetadata = planMetadataEntry.getValue(); // construct each worker metadata - WorkerMetadata[] workerMetadataList = new WorkerMetadata[dispatchablePlanMetadata.getTotalWorkerCount()]; - for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry - : dispatchablePlanMetadata.getServerInstanceToWorkerIdMap().entrySet()) { - for (int workerId : queryServerEntry.getValue()) { - VirtualServerAddress virtualServerAddress = new VirtualServerAddress(queryServerEntry.getKey(), workerId); - WorkerMetadata.Builder builder = new WorkerMetadata.Builder(); - builder.setVirtualServerAddress(virtualServerAddress); - if (dispatchablePlanMetadata.getScannedTables().size() == 1) { - builder.addTableSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap().get(workerId)); - } - builder.putAllMailBoxInfosMap(dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap().get(workerId)); - workerMetadataList[workerId] = builder.build(); + Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = + dispatchablePlanMetadata.getWorkerIdToServerInstanceMap(); + Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = + dispatchablePlanMetadata.getWorkerIdToSegmentsMap(); + Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap = + dispatchablePlanMetadata.getWorkerIdToMailboxesMap(); + Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = new HashMap<>(); + WorkerMetadata[] workerMetadataArray = new WorkerMetadata[workerIdToServerInstanceMap.size()]; + for (Map.Entry<Integer, QueryServerInstance> serverEntry : workerIdToServerInstanceMap.entrySet()) { + int workerId = serverEntry.getKey(); + QueryServerInstance queryServerInstance = serverEntry.getValue(); + serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId); + WorkerMetadata.Builder workerMetadataBuilder = new WorkerMetadata.Builder().setVirtualServerAddress( + new VirtualServerAddress(queryServerInstance, workerId)); + if (workerIdToSegmentsMap != null) { + workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId)); } + workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId)); + workerMetadataArray[workerId] = workerMetadataBuilder.build(); } // set the stageMetadata - int stageId = dispatchableEntry.getKey(); - dispatchablePlanFragmentList.get(stageId).setWorkerMetadataList(Arrays.asList(workerMetadataList)); - dispatchablePlanFragmentList.get(stageId) - .setWorkerIdToSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap()); - dispatchablePlanFragmentList.get(stageId) - .setServerInstanceToWorkerIdMap(dispatchablePlanMetadata.getServerInstanceToWorkerIdMap()); + DispatchablePlanFragment dispatchablePlanFragment = dispatchablePlanFragmentArray[stageId]; + dispatchablePlanFragment.setWorkerMetadataList(Arrays.asList(workerMetadataArray)); + if (workerIdToSegmentsMap != null) { + dispatchablePlanFragment.setWorkerIdToSegmentsMap(workerIdToSegmentsMap); + } + dispatchablePlanFragment.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdsMap); Preconditions.checkState(dispatchablePlanMetadata.getScannedTables().size() <= 1, "More than one table is not supported yet"); if (dispatchablePlanMetadata.getScannedTables().size() == 1) { - dispatchablePlanFragmentList.get(stageId).setTableName(dispatchablePlanMetadata.getScannedTables().get(0)); + dispatchablePlanFragment.setTableName(dispatchablePlanMetadata.getScannedTables().get(0)); } if (dispatchablePlanMetadata.getTimeBoundaryInfo() != null) { - dispatchablePlanFragmentList.get(stageId) - .setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo()); + dispatchablePlanFragment.setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo()); } } - return dispatchablePlanFragmentList; + return Arrays.asList(dispatchablePlanFragmentArray); } private void createDispatchablePlanFragmentList(DispatchablePlanFragment[] dispatchablePlanFragmentArray, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java index f53cc008f8..abe4f64a46 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.routing.MailboxMetadata; @@ -47,7 +48,7 @@ public class DispatchablePlanMetadata implements Serializable { private Map<String, String> _tableOptions; // used for assigning server/worker nodes. - private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap; + private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap; // used for table scan stage - we use ServerInstance instead of VirtualServer // here because all virtual servers that share a server instance will have the @@ -59,7 +60,7 @@ public class DispatchablePlanMetadata implements Serializable { private final Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap; // used for tracking unavailable segments from routing table, then assemble missing segments exception. - private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap; + private final Map<String, Set<String>> _tableToUnavailableSegmentsMap; // time boundary info private TimeBoundaryInfo _timeBoundaryInfo; @@ -70,13 +71,8 @@ public class DispatchablePlanMetadata implements Serializable { // whether a stage is partitioned table scan private boolean _isPartitionedTableScan; - // Total worker count of this stage. - private int _totalWorkerCount; - public DispatchablePlanMetadata() { _scannedTables = new ArrayList<>(); - _serverInstanceToWorkerIdMap = new HashMap<>(); - _workerIdToSegmentsMap = new HashMap<>(); _workerIdToMailboxesMap = new HashMap<>(); _tableToUnavailableSegmentsMap = new HashMap<>(); } @@ -102,6 +98,15 @@ public class DispatchablePlanMetadata implements Serializable { // attached physical plan context. // ----------------------------------------------- + public Map<Integer, QueryServerInstance> getWorkerIdToServerInstanceMap() { + return _workerIdToServerInstanceMap; + } + + public void setWorkerIdToServerInstanceMap(Map<Integer, QueryServerInstance> workerIdToServerInstanceMap) { + _workerIdToServerInstanceMap = workerIdToServerInstanceMap; + } + + @Nullable public Map<Integer, Map<String, List<String>>> getWorkerIdToSegmentsMap() { return _workerIdToSegmentsMap; } @@ -110,27 +115,10 @@ public class DispatchablePlanMetadata implements Serializable { _workerIdToSegmentsMap = workerIdToSegmentsMap; } - public Map<Integer, Map<Integer, MailboxMetadata>> getWorkerIdToMailBoxIdsMap() { + public Map<Integer, Map<Integer, MailboxMetadata>> getWorkerIdToMailboxesMap() { return _workerIdToMailboxesMap; } - public void setWorkerIdToMailBoxIdsMap(Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap) { - _workerIdToMailboxesMap.putAll(workerIdToMailboxesMap); - } - - public void addWorkerIdToMailBoxIdsMap(int planFragmentId, - Map<Integer, MailboxMetadata> planFragmentIdToMailboxesMap) { - _workerIdToMailboxesMap.put(planFragmentId, planFragmentIdToMailboxesMap); - } - - public Map<QueryServerInstance, List<Integer>> getServerInstanceToWorkerIdMap() { - return _serverInstanceToWorkerIdMap; - } - - public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer>> serverInstances) { - _serverInstanceToWorkerIdMap = serverInstances; - } - public TimeBoundaryInfo getTimeBoundaryInfo() { return _timeBoundaryInfo; } @@ -155,30 +143,11 @@ public class DispatchablePlanMetadata implements Serializable { _isPartitionedTableScan = isPartitionedTableScan; } - public int getTotalWorkerCount() { - return _totalWorkerCount; - } - - public void setTotalWorkerCount(int totalWorkerCount) { - _totalWorkerCount = totalWorkerCount; - } - - public void addTableToUnavailableSegmentsMap(String table, Collection<String> unavailableSegments) { - if (!_tableToUnavailableSegmentsMap.containsKey(table)) { - _tableToUnavailableSegmentsMap.put(table, new HashSet<>()); - } - _tableToUnavailableSegmentsMap.get(table).addAll(unavailableSegments); - } - - public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() { + public Map<String, Set<String>> getTableToUnavailableSegmentsMap() { return _tableToUnavailableSegmentsMap; } - @Override - public String toString() { - return "DispatchablePlanMetadata{" + "_scannedTables=" + _scannedTables + ", _serverInstanceToWorkerIdMap=" - + _serverInstanceToWorkerIdMap + ", _workerIdToSegmentsMap=" + _workerIdToSegmentsMap - + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap + ", _tableToUnavailableSegmentsMap=" - + _tableToUnavailableSegmentsMap + ", _timeBoundaryInfo=" + _timeBoundaryInfo + '}'; + public void addUnavailableSegments(String tableName, Collection<String> unavailableSegments) { + _tableToUnavailableSegmentsMap.computeIfAbsent(tableName, k -> new HashSet<>()).addAll(unavailableSegments); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java index bc6cb7c83e..a6d758335d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java @@ -18,9 +18,9 @@ */ package org.apache.pinot.query.planner.physical; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor; @@ -43,79 +43,76 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap(); DispatchablePlanMetadata senderMetadata = metadataMap.get(senderFragmentId); DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverFragmentId); - Map<QueryServerInstance, List<Integer>> senderWorkerIdsMap = senderMetadata.getServerInstanceToWorkerIdMap(); - Map<QueryServerInstance, List<Integer>> receiverWorkerIdsMap = receiverMetadata.getServerInstanceToWorkerIdMap(); - Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = senderMetadata.getWorkerIdToMailBoxIdsMap(); - Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailBoxIdsMap(); + Map<Integer, QueryServerInstance> senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap(); + Map<Integer, QueryServerInstance> receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap(); + Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); + Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); + int numSenders = senderServerMap.size(); + int numReceivers = receiverServerMap.size(); if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) { // For SINGLETON exchange type, send the data to the same instance (same worker id) - senderWorkerIdsMap.forEach((serverInstance, workerIds) -> { - for (int workerId : workerIds) { - MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList( - MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)), - Collections.singletonList(new VirtualServerAddress(serverInstance, workerId)), Collections.emptyMap()); - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata); - } - }); + Preconditions.checkState(numSenders == numReceivers, + "Got different number of workers for SINGLETON distribution type, sender: %s, receiver: %s", numSenders, + numReceivers); + for (int workerId = 0; workerId < numSenders; workerId++) { + QueryServerInstance senderServer = senderServerMap.get(workerId); + QueryServerInstance receiverServer = receiverServerMap.get(workerId); + Preconditions.checkState(senderServer.equals(receiverServer), + "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s", + workerId, senderServer, receiverServer); + MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList( + MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)), + Collections.singletonList(new VirtualServerAddress(senderServer, workerId)), Collections.emptyMap()); + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata); + } } else if (senderMetadata.isPartitionedTableScan()) { // For partitioned table scan, send the data to the worker with the same worker id (not necessary the same // instance) // TODO: Support further split the single partition into multiple workers - senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> { - for (int workerId : senderWorkerIds) { - receiverWorkerIdsMap.forEach((receiverServerInstance, receiverWorkerIds) -> { - for (int receiverWorkerId : receiverWorkerIds) { - if (receiverWorkerId == workerId) { - String mailboxId = - MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId); - MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), - Collections.singletonList(new VirtualServerAddress(receiverServerInstance, workerId)), - Collections.emptyMap()); - MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), - Collections.singletonList(new VirtualServerAddress(senderServerInstance, workerId)), - Collections.emptyMap()); - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(receiverFragmentId, serderMailboxMetadata); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(senderFragmentId, receiverMailboxMetadata); - break; - } - } - }); - } - }); + Preconditions.checkState(numSenders == numReceivers, + "Got different number of workers for partitioned table scan, sender: %s, receiver: %s", numSenders, + numReceivers); + for (int workerId = 0; workerId < numSenders; workerId++) { + String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId); + MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), + Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)), + Collections.emptyMap()); + MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), + Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)), + Collections.emptyMap()); + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(receiverFragmentId, serderMailboxMetadata); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(senderFragmentId, receiverMailboxMetadata); + } } else { // For other exchange types, send the data to all the instances in the receiver fragment - // TODO: - // 1. Add support for more exchange types - // 2. Keep the receiver worker id sequential in the senderMailboxMetadata so that the partitionId aligns with - // the workerId. It is useful for JOIN query when only left table is partitioned. - senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> { - for (int senderWorkerId : senderWorkerIds) { - Map<Integer, MailboxMetadata> senderMailboxMetadataMap = - senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()); - receiverWorkerIdsMap.forEach((receiverServerInstance, receiverWorkerIds) -> { - for (int receiverWorkerId : receiverWorkerIds) { - Map<Integer, MailboxMetadata> receiverMailboxMetadataMap = - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()); - String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, - receiverWorkerId); - MailboxMetadata senderMailboxMetadata = - senderMailboxMetadataMap.computeIfAbsent(receiverFragmentId, k -> new MailboxMetadata()); - senderMailboxMetadata.getMailBoxIdList().add(mailboxId); - senderMailboxMetadata.getVirtualAddressList() - .add(new VirtualServerAddress(receiverServerInstance, receiverWorkerId)); - MailboxMetadata receiverMailboxMetadata = - receiverMailboxMetadataMap.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata()); - receiverMailboxMetadata.getMailBoxIdList().add(mailboxId); - receiverMailboxMetadata.getVirtualAddressList() - .add(new VirtualServerAddress(senderServerInstance, senderWorkerId)); - } - }); + // NOTE: Keep the receiver worker id sequential in the senderMailboxMetadata so that the partitionId aligns with + // the workerId. It is useful for JOIN query when only left table is partitioned. + // TODO: Add support for more exchange types + for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { + VirtualServerAddress senderAddress = + new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId); + MailboxMetadata senderMailboxMetadata = new MailboxMetadata(); + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) + .put(receiverFragmentId, senderMailboxMetadata); + for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { + VirtualServerAddress receiverAddress = + new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId); + String mailboxId = + MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId); + senderMailboxMetadata.getMailBoxIdList().add(mailboxId); + senderMailboxMetadata.getVirtualAddressList().add(receiverAddress); + + MailboxMetadata receiverMailboxMetadata = + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata()); + receiverMailboxMetadata.getMailBoxIdList().add(mailboxId); + receiverMailboxMetadata.getVirtualAddressList().add(senderAddress); } - }); + } } } return null; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java index 35c0a99ef4..d1618f2fc0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java @@ -18,10 +18,10 @@ */ package org.apache.pinot.query.planner.physical; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.query.context.PlannerContext; import org.apache.pinot.query.planner.DispatchableSubPlan; @@ -92,18 +92,12 @@ public class PinotDispatchPlanner { populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap())); } - private static Map<String, Collection<String>> populateTableUnavailableSegments( + private static Map<String, Set<String>> populateTableUnavailableSegments( Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) { - Map<String, Collection<String>> tableToUnavailableSegments = new HashMap<>(); - dispatchablePlanMetadataMap.values() - .forEach(dispatchablePlanMetadata -> dispatchablePlanMetadata.getTableToUnavailableSegmentsMap().forEach( - (table, segments) -> { - if (!tableToUnavailableSegments.containsKey(table)) { - tableToUnavailableSegments.put(table, new HashSet<>()); - } - tableToUnavailableSegments.get(table).addAll(segments); - } - )); + Map<String, Set<String>> tableToUnavailableSegments = new HashMap<>(); + dispatchablePlanMetadataMap.values().forEach(metadata -> metadata.getTableToUnavailableSegmentsMap().forEach( + (tableName, unavailableSegments) -> tableToUnavailableSegments.computeIfAbsent(tableName, k -> new HashSet<>()) + .addAll(unavailableSegments))); return tableToUnavailableSegments; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java index 50f78e64bf..287db51daa 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java @@ -125,10 +125,8 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati boolean canColocate = canJoinBeColocated(node); // Step-2: Only if the servers assigned to both left and right nodes are equal and the servers assigned to the join // stage are a superset of those servers, can we skip shuffles. - canColocate = - canColocate && canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(), - innerLeafNodes.get(0).getSenderStageId(), - innerLeafNodes.get(1).getSenderStageId()); + canColocate = canColocate && canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(), + innerLeafNodes.get(0).getSenderStageId(), innerLeafNodes.get(1).getSenderStageId()); // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, check whether the key partitioning can // allow shuffle skip. canColocate = canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(0), @@ -140,8 +138,8 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), innerLeafNodes.get(1), context); if (canColocate) { // If shuffle can be skipped, reassign servers. - _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap( - _dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstanceToWorkerIdMap()); + _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap( + _dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getWorkerIdToServerInstanceMap()); _canSkipShuffleForJoin = true; } @@ -174,13 +172,13 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getPlanFragmentId(), node.getSenderStageId())) { node.setDistributionType(RelDistribution.Type.SINGLETON); - _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap( - _dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap()); + _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap( + _dispatchablePlanMetadataMap.get(node.getSenderStageId()).getWorkerIdToServerInstanceMap()); return oldColocationKeys; } // This means we can't skip shuffle and there's a partitioning enforced by receiver. - int numPartitions = - _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size(); + int numPartitions = new HashSet<>( + _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size(); List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream() .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList()); return new HashSet<>(colocationKeys); @@ -196,8 +194,8 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati return new HashSet<>(); } // This means we can't skip shuffle and there's a partitioning enforced by receiver. - int numPartitions = - _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size(); + int numPartitions = new HashSet<>( + _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size(); List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream() .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList()); return new HashSet<>(colocationKeys); @@ -307,8 +305,9 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati * Checks if servers assigned to the receiver stage are a super-set of the sender stage. */ private boolean areServersSuperset(int receiverStageId, int senderStageId) { - return _dispatchablePlanMetadataMap.get(receiverStageId).getServerInstanceToWorkerIdMap().keySet() - .containsAll(_dispatchablePlanMetadataMap.get(senderStageId).getServerInstanceToWorkerIdMap().keySet()); + return new HashSet<>( + _dispatchablePlanMetadataMap.get(receiverStageId).getWorkerIdToServerInstanceMap().values()).containsAll( + _dispatchablePlanMetadataMap.get(senderStageId).getWorkerIdToServerInstanceMap().values()); } /* @@ -317,15 +316,15 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati * 2. Servers assigned to the join-stage are a superset of S. */ private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int leftStageId, int rightStageId) { - Set<QueryServerInstance> leftServerInstances = new HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId) - .getServerInstanceToWorkerIdMap().keySet()); - Set<QueryServerInstance> rightServerInstances = _dispatchablePlanMetadataMap.get(rightStageId) - .getServerInstanceToWorkerIdMap().keySet(); - Set<QueryServerInstance> currentServerInstances = _dispatchablePlanMetadataMap.get(currentStageId) - .getServerInstanceToWorkerIdMap().keySet(); + Set<QueryServerInstance> leftServerInstances = + new HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId).getWorkerIdToServerInstanceMap().values()); + Set<QueryServerInstance> rightServerInstances = + new HashSet<>(_dispatchablePlanMetadataMap.get(rightStageId).getWorkerIdToServerInstanceMap().values()); + Set<QueryServerInstance> currentServerInstances = + new HashSet<>(_dispatchablePlanMetadataMap.get(currentStageId).getWorkerIdToServerInstanceMap().values()); return leftServerInstances.containsAll(rightServerInstances) - && leftServerInstances.size() == rightServerInstances.size() - && currentServerInstances.containsAll(leftServerInstances); + && leftServerInstances.size() == rightServerInstances.size() && currentServerInstances.containsAll( + leftServerInstances); } /** diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 5fdb67c7c8..a57356cb33 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -73,9 +73,8 @@ public class WorkerManager { // ROOT stage doesn't have a QueryServer as it is strictly only reducing results, so here we simply assign the // worker instance with identical server/mailbox port number. DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(0); - metadata.setServerInstanceToWorkerIdMap( - Collections.singletonMap(new QueryServerInstance(_hostName, _port, _port), Collections.singletonList(0))); - metadata.setTotalWorkerCount(1); + metadata.setWorkerIdToServerInstanceMap( + Collections.singletonMap(0, new QueryServerInstance(_hostName, _port, _port))); for (PlanFragment child : rootFragment.getChildren()) { assignWorkersToNonRootFragment(child, context); } @@ -156,21 +155,19 @@ public class WorkerManager { // attach unavailable segments to metadata if (!routingTable.getUnavailableSegments().isEmpty()) { - metadata.addTableToUnavailableSegmentsMap(tableName, routingTable.getUnavailableSegments()); + metadata.addUnavailableSegments(tableName, routingTable.getUnavailableSegments()); } } - int globalIdx = 0; - Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new HashMap<>(); + int workerId = 0; + Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>(); Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new HashMap<>(); for (Map.Entry<ServerInstance, Map<String, List<String>>> entry : serverInstanceToSegmentsMap.entrySet()) { - QueryServerInstance queryServerInstance = new QueryServerInstance(entry.getKey()); - serverInstanceToWorkerIdMap.put(queryServerInstance, Collections.singletonList(globalIdx)); - workerIdToSegmentsMap.put(globalIdx, entry.getValue()); - globalIdx++; + workerIdToServerInstanceMap.put(workerId, new QueryServerInstance(entry.getKey())); + workerIdToSegmentsMap.put(workerId, entry.getValue()); + workerId++; } - metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap); + metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap); metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap); - metadata.setTotalWorkerCount(globalIdx); } /** @@ -219,8 +216,8 @@ public class WorkerManager { // segments for the same partition is colocated long indexToPick = context.getRequestId(); ColocatedPartitionInfo[] partitionInfoMap = colocatedTableInfo._partitionInfoMap; - int nextWorkerId = 0; - Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new HashMap<>(); + int workerId = 0; + Map<Integer, QueryServerInstance> workedIdToServerInstanceMap = new HashMap<>(); Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new HashMap<>(); Map<String, ServerInstance> enabledServerInstanceMap = _routingManager.getEnabledServerInstanceMap(); for (int i = 0; i < numPartitions; i++) { @@ -233,15 +230,12 @@ public class WorkerManager { pickEnabledServer(partitionInfo._fullyReplicatedServers, enabledServerInstanceMap, indexToPick++); Preconditions.checkState(serverInstance != null, "Failed to find enabled fully replicated server for table: %s, partition: %s in table: %s", tableName, i); - QueryServerInstance queryServerInstance = new QueryServerInstance(serverInstance); - int workerId = nextWorkerId++; - serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId); + workedIdToServerInstanceMap.put(workerId, new QueryServerInstance(serverInstance)); workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo)); + workerId++; } - - metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap); + metadata.setWorkerIdToServerInstanceMap(workedIdToServerInstanceMap); metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap); - metadata.setTotalWorkerCount(nextWorkerId); metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo); metadata.setPartitionedTableScan(true); } @@ -260,8 +254,7 @@ public class WorkerManager { if (children.size() > 0) { DispatchablePlanMetadata firstChildMetadata = metadataMap.get(children.get(0).getFragmentId()); if (firstChildMetadata.isPartitionedTableScan()) { - metadata.setServerInstanceToWorkerIdMap(firstChildMetadata.getServerInstanceToWorkerIdMap()); - metadata.setTotalWorkerCount(firstChildMetadata.getTotalWorkerCount()); + metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap()); return; } } @@ -291,22 +284,18 @@ public class WorkerManager { int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1")); if (metadata.isRequiresSingletonInstance()) { // require singleton should return a single global worker ID with 0; - ServerInstance serverInstance = serverInstances.get(RANDOM.nextInt(serverInstances.size())); - metadata.setServerInstanceToWorkerIdMap( - Collections.singletonMap(new QueryServerInstance(serverInstance), Collections.singletonList(0))); - metadata.setTotalWorkerCount(1); + metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0, + new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size()))))); } else { - Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new HashMap<>(); - int nextWorkerId = 0; + Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>(); + int workerId = 0; for (ServerInstance serverInstance : serverInstances) { - List<Integer> workerIds = new ArrayList<>(); + QueryServerInstance queryServerInstance = new QueryServerInstance(serverInstance); for (int i = 0; i < stageParallelism; i++) { - workerIds.add(nextWorkerId++); + workerIdToServerInstanceMap.put(workerId++, queryServerInstance); } - serverInstanceToWorkerIdMap.put(new QueryServerInstance(serverInstance), workerIds); } - metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap); - metadata.setTotalWorkerCount(nextWorkerId); + metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap); } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java index 3015deea4b..e66a08a942 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java @@ -478,40 +478,40 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase { return new Object[][] { new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a", "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n" - + "├── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" - + "│ └── [1]@localhost:2 PROJECT\n" - + "│ └── [1]@localhost:2 TABLE SCAN (a) null\n" - + "└── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" - + " └── [1]@localhost:1 PROJECT\n" - + " └── [1]@localhost:1 TABLE SCAN (a) null\n"}, + + "├── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" + + "│ └── [1]@localhost:1 PROJECT\n" + + "│ └── [1]@localhost:1 TABLE SCAN (a) null\n" + + "└── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" + + " └── [1]@localhost:2 PROJECT\n" + + " └── [1]@localhost:2 TABLE SCAN (a) null\n"}, new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR " + "SELECT col1, COUNT(*) FROM a GROUP BY col1", "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n" - + "├── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n" - + "└── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" - + " └── [1]@localhost:1 AGGREGATE_FINAL\n" - + " └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n" - + " ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n" - + " │ └── [2]@localhost:2 AGGREGATE_LEAF\n" - + " │ └── [2]@localhost:2 TABLE SCAN (a) null\n" - + " └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n" - + " └── [2]@localhost:1 AGGREGATE_LEAF\n" - + " └── [2]@localhost:1 TABLE SCAN (a) null\n"}, + + "├── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n" + + "└── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" + + " └── [1]@localhost:2 AGGREGATE_FINAL\n" + + " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n" + + " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n" + + " │ └── [2]@localhost:1 AGGREGATE_LEAF\n" + + " │ └── [2]@localhost:1 TABLE SCAN (a) null\n" + + " └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n" + + " └── [2]@localhost:2 AGGREGATE_LEAF\n" + + " └── [2]@localhost:2 TABLE SCAN (a) null\n"}, new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1", "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n" - + "├── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n" - + "└── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" - + " └── [1]@localhost:1 PROJECT\n" - + " └── [1]@localhost:1 JOIN\n" - + " ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n" - + " │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n" - + " │ │ └── [2]@localhost:2 PROJECT\n" - + " │ │ └── [2]@localhost:2 TABLE SCAN (a) null\n" - + " │ └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n" - + " │ └── [2]@localhost:1 PROJECT\n" - + " │ └── [2]@localhost:1 TABLE SCAN (a) null\n" - + " └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n" - + " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n" + + "├── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n" + + "└── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n" + + " └── [1]@localhost:2 PROJECT\n" + + " └── [1]@localhost:2 JOIN\n" + + " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n" + + " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n" + + " │ │ └── [2]@localhost:1 PROJECT\n" + + " │ │ └── [2]@localhost:1 TABLE SCAN (a) null\n" + + " │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n" + + " │ └── [2]@localhost:2 PROJECT\n" + + " │ └── [2]@localhost:2 TABLE SCAN (a) null\n" + + " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n" + + " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n" + " └── [3]@localhost:1 PROJECT\n" + " └── [3]@localhost:1 TABLE SCAN (b) null\n"} }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org