This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 43eb011c12 In DispatchablePlanMetadata, store worker id to server 
instance map (#11256)
43eb011c12 is described below

commit 43eb011c125a0d35bf1204fb8794293f66633870
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Aug 3 12:05:36 2023 -0700

    In DispatchablePlanMetadata, store worker id to server instance map (#11256)
---
 .../MultiStageBrokerRequestHandler.java            |   6 +-
 .../pinot/query/planner/DispatchableSubPlan.java   |   8 +-
 .../planner/physical/DispatchablePlanContext.java  |  57 +++++-----
 .../planner/physical/DispatchablePlanMetadata.java |  63 +++--------
 .../planner/physical/MailboxAssignmentVisitor.java | 125 ++++++++++-----------
 .../planner/physical/PinotDispatchPlanner.java     |  18 +--
 .../colocated/GreedyShuffleRewriteVisitor.java     |  43 ++++---
 .../apache/pinot/query/routing/WorkerManager.java  |  55 ++++-----
 .../apache/pinot/query/QueryCompilationTest.java   |  58 +++++-----
 9 files changed, 192 insertions(+), 241 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 8bbf955015..f1fb0d5b94 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,7 +20,6 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -245,10 +244,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     brokerResponse.setResultTable(queryResults);
 
     dispatchableSubPlan.getTableToUnavailableSegmentsMap().forEach(
-        (table, segmentList) -> brokerResponse.addToExceptions(
+        (tableName, unavailableSegments) -> brokerResponse.addToExceptions(
             new 
QueryProcessingException(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE,
-                String.format("Some segments are unavailable for table %s, 
unavailable segments: [%s]", table,
-                    Arrays.toString(segmentList.toArray())))));
+                String.format("Find unavailable segments: %s for table: %s", 
unavailableSegments, tableName))));
 
     for (Map.Entry<Integer, ExecutionStatsAggregator> entry : 
stageIdStatsMap.entrySet()) {
       if (entry.getKey() == 0) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
index 4f99820351..748c3ac362 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.planner;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -42,11 +41,10 @@ public class DispatchableSubPlan {
   private final List<Pair<Integer, String>> _queryResultFields;
   private final List<DispatchablePlanFragment> _queryStageList;
   private final Set<String> _tableNames;
-
-  private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
+  private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
 
   public DispatchableSubPlan(List<Pair<Integer, String>> fields, 
List<DispatchablePlanFragment> queryStageList,
-      Set<String> tableNames, Map<String, Collection<String>> 
tableToUnavailableSegmentsMap) {
+      Set<String> tableNames, Map<String, Set<String>> 
tableToUnavailableSegmentsMap) {
     _queryResultFields = fields;
     _queryStageList = queryStageList;
     _tableNames = tableNames;
@@ -81,7 +79,7 @@ public class DispatchableSubPlan {
    * Get the table to unavailable segments map
    * @return table to unavailable segments map
    */
-  public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+  public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
     return _tableToUnavailableSegmentsMap;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index c0fb98d9a5..4699014ad0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.planner.physical;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerManager;
@@ -90,44 +92,49 @@ public class DispatchablePlanContext {
     DispatchablePlanFragment[] dispatchablePlanFragmentArray =
         new DispatchablePlanFragment[_dispatchablePlanStageRootMap.size()];
     createDispatchablePlanFragmentList(dispatchablePlanFragmentArray, 
subPlanRoot);
-    List<DispatchablePlanFragment> dispatchablePlanFragmentList = 
Arrays.asList(dispatchablePlanFragmentArray);
-    for (Map.Entry<Integer, DispatchablePlanMetadata> dispatchableEntry : 
_dispatchablePlanMetadataMap.entrySet()) {
-      DispatchablePlanMetadata dispatchablePlanMetadata = 
dispatchableEntry.getValue();
+    for (Map.Entry<Integer, DispatchablePlanMetadata> planMetadataEntry : 
_dispatchablePlanMetadataMap.entrySet()) {
+      int stageId = planMetadataEntry.getKey();
+      DispatchablePlanMetadata dispatchablePlanMetadata = 
planMetadataEntry.getValue();
 
       // construct each worker metadata
-      WorkerMetadata[] workerMetadataList = new 
WorkerMetadata[dispatchablePlanMetadata.getTotalWorkerCount()];
-      for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
-          : 
dispatchablePlanMetadata.getServerInstanceToWorkerIdMap().entrySet()) {
-        for (int workerId : queryServerEntry.getValue()) {
-          VirtualServerAddress virtualServerAddress = new 
VirtualServerAddress(queryServerEntry.getKey(), workerId);
-          WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
-          builder.setVirtualServerAddress(virtualServerAddress);
-          if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
-            
builder.addTableSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap().get(workerId));
-          }
-          
builder.putAllMailBoxInfosMap(dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap().get(workerId));
-          workerMetadataList[workerId] = builder.build();
+      Map<Integer, QueryServerInstance> workerIdToServerInstanceMap =
+          dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
+      Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
+          dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
+      Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap =
+          dispatchablePlanMetadata.getWorkerIdToMailboxesMap();
+      Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = 
new HashMap<>();
+      WorkerMetadata[] workerMetadataArray = new 
WorkerMetadata[workerIdToServerInstanceMap.size()];
+      for (Map.Entry<Integer, QueryServerInstance> serverEntry : 
workerIdToServerInstanceMap.entrySet()) {
+        int workerId = serverEntry.getKey();
+        QueryServerInstance queryServerInstance = serverEntry.getValue();
+        serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> 
new ArrayList<>()).add(workerId);
+        WorkerMetadata.Builder workerMetadataBuilder = new 
WorkerMetadata.Builder().setVirtualServerAddress(
+            new VirtualServerAddress(queryServerInstance, workerId));
+        if (workerIdToSegmentsMap != null) {
+          
workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
         }
+        
workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId));
+        workerMetadataArray[workerId] = workerMetadataBuilder.build();
       }
 
       // set the stageMetadata
-      int stageId = dispatchableEntry.getKey();
-      
dispatchablePlanFragmentList.get(stageId).setWorkerMetadataList(Arrays.asList(workerMetadataList));
-      dispatchablePlanFragmentList.get(stageId)
-          
.setWorkerIdToSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap());
-      dispatchablePlanFragmentList.get(stageId)
-          
.setServerInstanceToWorkerIdMap(dispatchablePlanMetadata.getServerInstanceToWorkerIdMap());
+      DispatchablePlanFragment dispatchablePlanFragment = 
dispatchablePlanFragmentArray[stageId];
+      
dispatchablePlanFragment.setWorkerMetadataList(Arrays.asList(workerMetadataArray));
+      if (workerIdToSegmentsMap != null) {
+        
dispatchablePlanFragment.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
+      }
+      
dispatchablePlanFragment.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdsMap);
       
Preconditions.checkState(dispatchablePlanMetadata.getScannedTables().size() <= 
1,
           "More than one table is not supported yet");
       if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
-        
dispatchablePlanFragmentList.get(stageId).setTableName(dispatchablePlanMetadata.getScannedTables().get(0));
+        
dispatchablePlanFragment.setTableName(dispatchablePlanMetadata.getScannedTables().get(0));
       }
       if (dispatchablePlanMetadata.getTimeBoundaryInfo() != null) {
-        dispatchablePlanFragmentList.get(stageId)
-            
.setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
+        
dispatchablePlanFragment.setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
       }
     }
-    return dispatchablePlanFragmentList;
+    return Arrays.asList(dispatchablePlanFragmentArray);
   }
 
   private void createDispatchablePlanFragmentList(DispatchablePlanFragment[] 
dispatchablePlanFragmentArray,
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index f53cc008f8..abe4f64a46 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.routing.MailboxMetadata;
@@ -47,7 +48,7 @@ public class DispatchablePlanMetadata implements Serializable 
{
   private Map<String, String> _tableOptions;
 
   // used for assigning server/worker nodes.
-  private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
+  private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;
 
   // used for table scan stage - we use ServerInstance instead of VirtualServer
   // here because all virtual servers that share a server instance will have 
the
@@ -59,7 +60,7 @@ public class DispatchablePlanMetadata implements Serializable 
{
   private final Map<Integer, Map<Integer, MailboxMetadata>> 
_workerIdToMailboxesMap;
 
   // used for tracking unavailable segments from routing table, then assemble 
missing segments exception.
-  private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
+  private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
 
   // time boundary info
   private TimeBoundaryInfo _timeBoundaryInfo;
@@ -70,13 +71,8 @@ public class DispatchablePlanMetadata implements 
Serializable {
   // whether a stage is partitioned table scan
   private boolean _isPartitionedTableScan;
 
-  // Total worker count of this stage.
-  private int _totalWorkerCount;
-
   public DispatchablePlanMetadata() {
     _scannedTables = new ArrayList<>();
-    _serverInstanceToWorkerIdMap = new HashMap<>();
-    _workerIdToSegmentsMap = new HashMap<>();
     _workerIdToMailboxesMap = new HashMap<>();
     _tableToUnavailableSegmentsMap = new HashMap<>();
   }
@@ -102,6 +98,15 @@ public class DispatchablePlanMetadata implements 
Serializable {
   // attached physical plan context.
   // -----------------------------------------------
 
+  public Map<Integer, QueryServerInstance> getWorkerIdToServerInstanceMap() {
+    return _workerIdToServerInstanceMap;
+  }
+
+  public void setWorkerIdToServerInstanceMap(Map<Integer, QueryServerInstance> 
workerIdToServerInstanceMap) {
+    _workerIdToServerInstanceMap = workerIdToServerInstanceMap;
+  }
+
+  @Nullable
   public Map<Integer, Map<String, List<String>>> getWorkerIdToSegmentsMap() {
     return _workerIdToSegmentsMap;
   }
@@ -110,27 +115,10 @@ public class DispatchablePlanMetadata implements 
Serializable {
     _workerIdToSegmentsMap = workerIdToSegmentsMap;
   }
 
-  public Map<Integer, Map<Integer, MailboxMetadata>> 
getWorkerIdToMailBoxIdsMap() {
+  public Map<Integer, Map<Integer, MailboxMetadata>> 
getWorkerIdToMailboxesMap() {
     return _workerIdToMailboxesMap;
   }
 
-  public void setWorkerIdToMailBoxIdsMap(Map<Integer, Map<Integer, 
MailboxMetadata>> workerIdToMailboxesMap) {
-    _workerIdToMailboxesMap.putAll(workerIdToMailboxesMap);
-  }
-
-  public void addWorkerIdToMailBoxIdsMap(int planFragmentId,
-      Map<Integer, MailboxMetadata> planFragmentIdToMailboxesMap) {
-    _workerIdToMailboxesMap.put(planFragmentId, planFragmentIdToMailboxesMap);
-  }
-
-  public Map<QueryServerInstance, List<Integer>> 
getServerInstanceToWorkerIdMap() {
-    return _serverInstanceToWorkerIdMap;
-  }
-
-  public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, 
List<Integer>> serverInstances) {
-    _serverInstanceToWorkerIdMap = serverInstances;
-  }
-
   public TimeBoundaryInfo getTimeBoundaryInfo() {
     return _timeBoundaryInfo;
   }
@@ -155,30 +143,11 @@ public class DispatchablePlanMetadata implements 
Serializable {
     _isPartitionedTableScan = isPartitionedTableScan;
   }
 
-  public int getTotalWorkerCount() {
-    return _totalWorkerCount;
-  }
-
-  public void setTotalWorkerCount(int totalWorkerCount) {
-    _totalWorkerCount = totalWorkerCount;
-  }
-
-  public void addTableToUnavailableSegmentsMap(String table, 
Collection<String> unavailableSegments) {
-    if (!_tableToUnavailableSegmentsMap.containsKey(table)) {
-      _tableToUnavailableSegmentsMap.put(table, new HashSet<>());
-    }
-    _tableToUnavailableSegmentsMap.get(table).addAll(unavailableSegments);
-  }
-
-  public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+  public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
     return _tableToUnavailableSegmentsMap;
   }
 
-  @Override
-  public String toString() {
-    return "DispatchablePlanMetadata{" + "_scannedTables=" + _scannedTables + 
", _serverInstanceToWorkerIdMap="
-        + _serverInstanceToWorkerIdMap + ", _workerIdToSegmentsMap=" + 
_workerIdToSegmentsMap
-        + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap + ", 
_tableToUnavailableSegmentsMap="
-        + _tableToUnavailableSegmentsMap + ", _timeBoundaryInfo=" + 
_timeBoundaryInfo + '}';
+  public void addUnavailableSegments(String tableName, Collection<String> 
unavailableSegments) {
+    _tableToUnavailableSegmentsMap.computeIfAbsent(tableName, k -> new 
HashSet<>()).addAll(unavailableSegments);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index bc6cb7c83e..a6d758335d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelDistribution;
 import 
org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
@@ -43,79 +43,76 @@ public class MailboxAssignmentVisitor extends 
DefaultPostOrderTraversalVisitor<V
       Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
       DispatchablePlanMetadata senderMetadata = 
metadataMap.get(senderFragmentId);
       DispatchablePlanMetadata receiverMetadata = 
metadataMap.get(receiverFragmentId);
-      Map<QueryServerInstance, List<Integer>> senderWorkerIdsMap = 
senderMetadata.getServerInstanceToWorkerIdMap();
-      Map<QueryServerInstance, List<Integer>> receiverWorkerIdsMap = 
receiverMetadata.getServerInstanceToWorkerIdMap();
-      Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = 
senderMetadata.getWorkerIdToMailBoxIdsMap();
-      Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = 
receiverMetadata.getWorkerIdToMailBoxIdsMap();
+      Map<Integer, QueryServerInstance> senderServerMap = 
senderMetadata.getWorkerIdToServerInstanceMap();
+      Map<Integer, QueryServerInstance> receiverServerMap = 
receiverMetadata.getWorkerIdToServerInstanceMap();
+      Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = 
senderMetadata.getWorkerIdToMailboxesMap();
+      Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = 
receiverMetadata.getWorkerIdToMailboxesMap();
 
+      int numSenders = senderServerMap.size();
+      int numReceivers = receiverServerMap.size();
       if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) {
         // For SINGLETON exchange type, send the data to the same instance 
(same worker id)
-        senderWorkerIdsMap.forEach((serverInstance, workerIds) -> {
-          for (int workerId : workerIds) {
-            MailboxMetadata mailboxMetadata = new 
MailboxMetadata(Collections.singletonList(
-                MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, 
receiverFragmentId, workerId)),
-                Collections.singletonList(new 
VirtualServerAddress(serverInstance, workerId)), Collections.emptyMap());
-            senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(receiverFragmentId, mailboxMetadata);
-            receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(senderFragmentId, mailboxMetadata);
-          }
-        });
+        Preconditions.checkState(numSenders == numReceivers,
+            "Got different number of workers for SINGLETON distribution type, 
sender: %s, receiver: %s", numSenders,
+            numReceivers);
+        for (int workerId = 0; workerId < numSenders; workerId++) {
+          QueryServerInstance senderServer = senderServerMap.get(workerId);
+          QueryServerInstance receiverServer = receiverServerMap.get(workerId);
+          Preconditions.checkState(senderServer.equals(receiverServer),
+              "Got different server for SINGLETON distribution type for worker 
id: %s, sender: %s, receiver: %s",
+              workerId, senderServer, receiverServer);
+          MailboxMetadata mailboxMetadata = new 
MailboxMetadata(Collections.singletonList(
+              MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, 
receiverFragmentId, workerId)),
+              Collections.singletonList(new VirtualServerAddress(senderServer, 
workerId)), Collections.emptyMap());
+          senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(receiverFragmentId, mailboxMetadata);
+          receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(senderFragmentId, mailboxMetadata);
+        }
       } else if (senderMetadata.isPartitionedTableScan()) {
         // For partitioned table scan, send the data to the worker with the 
same worker id (not necessary the same
         // instance)
         // TODO: Support further split the single partition into multiple 
workers
-        senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
-          for (int workerId : senderWorkerIds) {
-            receiverWorkerIdsMap.forEach((receiverServerInstance, 
receiverWorkerIds) -> {
-              for (int receiverWorkerId : receiverWorkerIds) {
-                if (receiverWorkerId == workerId) {
-                  String mailboxId =
-                      MailboxIdUtils.toPlanMailboxId(senderFragmentId, 
workerId, receiverFragmentId, workerId);
-                  MailboxMetadata serderMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
-                      Collections.singletonList(new 
VirtualServerAddress(receiverServerInstance, workerId)),
-                      Collections.emptyMap());
-                  MailboxMetadata receiverMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
-                      Collections.singletonList(new 
VirtualServerAddress(senderServerInstance, workerId)),
-                      Collections.emptyMap());
-                  senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
-                      .put(receiverFragmentId, serderMailboxMetadata);
-                  receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
-                      .put(senderFragmentId, receiverMailboxMetadata);
-                  break;
-                }
-              }
-            });
-          }
-        });
+        Preconditions.checkState(numSenders == numReceivers,
+            "Got different number of workers for partitioned table scan, 
sender: %s, receiver: %s", numSenders,
+            numReceivers);
+        for (int workerId = 0; workerId < numSenders; workerId++) {
+          String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, 
workerId, receiverFragmentId, workerId);
+          MailboxMetadata serderMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
+              Collections.singletonList(new 
VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
+              Collections.emptyMap());
+          MailboxMetadata receiverMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
+              Collections.singletonList(new 
VirtualServerAddress(senderServerMap.get(workerId), workerId)),
+              Collections.emptyMap());
+          senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+              .put(receiverFragmentId, serderMailboxMetadata);
+          receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+              .put(senderFragmentId, receiverMailboxMetadata);
+        }
       } else {
         // For other exchange types, send the data to all the instances in the 
receiver fragment
-        // TODO:
-        //   1. Add support for more exchange types
-        //   2. Keep the receiver worker id sequential in the 
senderMailboxMetadata so that the partitionId aligns with
-        //      the workerId. It is useful for JOIN query when only left table 
is partitioned.
-        senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
-          for (int senderWorkerId : senderWorkerIds) {
-            Map<Integer, MailboxMetadata> senderMailboxMetadataMap =
-                senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new 
HashMap<>());
-            receiverWorkerIdsMap.forEach((receiverServerInstance, 
receiverWorkerIds) -> {
-              for (int receiverWorkerId : receiverWorkerIds) {
-                Map<Integer, MailboxMetadata> receiverMailboxMetadataMap =
-                    receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k 
-> new HashMap<>());
-                String mailboxId = 
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, 
receiverFragmentId,
-                    receiverWorkerId);
-                MailboxMetadata senderMailboxMetadata =
-                    
senderMailboxMetadataMap.computeIfAbsent(receiverFragmentId, k -> new 
MailboxMetadata());
-                senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
-                senderMailboxMetadata.getVirtualAddressList()
-                    .add(new VirtualServerAddress(receiverServerInstance, 
receiverWorkerId));
-                MailboxMetadata receiverMailboxMetadata =
-                    
receiverMailboxMetadataMap.computeIfAbsent(senderFragmentId, k -> new 
MailboxMetadata());
-                receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
-                receiverMailboxMetadata.getVirtualAddressList()
-                    .add(new VirtualServerAddress(senderServerInstance, 
senderWorkerId));
-              }
-            });
+        // NOTE: Keep the receiver worker id sequential in the 
senderMailboxMetadata so that the partitionId aligns with
+        //       the workerId. It is useful for JOIN query when only left 
table is partitioned.
+        // TODO: Add support for more exchange types
+        for (int senderWorkerId = 0; senderWorkerId < numSenders; 
senderWorkerId++) {
+          VirtualServerAddress senderAddress =
+              new VirtualServerAddress(senderServerMap.get(senderWorkerId), 
senderWorkerId);
+          MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+          senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new 
HashMap<>())
+              .put(receiverFragmentId, senderMailboxMetadata);
+          for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; 
receiverWorkerId++) {
+            VirtualServerAddress receiverAddress =
+                new 
VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
+            String mailboxId =
+                MailboxIdUtils.toPlanMailboxId(senderFragmentId, 
senderWorkerId, receiverFragmentId, receiverWorkerId);
+            senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
+            senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+
+            MailboxMetadata receiverMailboxMetadata =
+                receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> 
new HashMap<>())
+                    .computeIfAbsent(senderFragmentId, k -> new 
MailboxMetadata());
+            receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
+            receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
           }
-        });
+        }
       }
     }
     return null;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index 35c0a99ef4..d1618f2fc0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pinot.query.planner.physical;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
@@ -92,18 +92,12 @@ public class PinotDispatchPlanner {
         
populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap()));
   }
 
-  private static Map<String, Collection<String>> 
populateTableUnavailableSegments(
+  private static Map<String, Set<String>> populateTableUnavailableSegments(
       Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
-    Map<String, Collection<String>> tableToUnavailableSegments = new 
HashMap<>();
-    dispatchablePlanMetadataMap.values()
-        .forEach(dispatchablePlanMetadata -> 
dispatchablePlanMetadata.getTableToUnavailableSegmentsMap().forEach(
-            (table, segments) -> {
-              if (!tableToUnavailableSegments.containsKey(table)) {
-                tableToUnavailableSegments.put(table, new HashSet<>());
-              }
-              tableToUnavailableSegments.get(table).addAll(segments);
-            }
-        ));
+    Map<String, Set<String>> tableToUnavailableSegments = new HashMap<>();
+    dispatchablePlanMetadataMap.values().forEach(metadata -> 
metadata.getTableToUnavailableSegmentsMap().forEach(
+        (tableName, unavailableSegments) -> 
tableToUnavailableSegments.computeIfAbsent(tableName, k -> new HashSet<>())
+            .addAll(unavailableSegments)));
     return tableToUnavailableSegments;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 50f78e64bf..287db51daa 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -125,10 +125,8 @@ public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<Colocati
     boolean canColocate = canJoinBeColocated(node);
     // Step-2: Only if the servers assigned to both left and right nodes are 
equal and the servers assigned to the join
     //         stage are a superset of those servers, can we skip shuffles.
-    canColocate =
-        canColocate && 
canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(),
-            innerLeafNodes.get(0).getSenderStageId(),
-            innerLeafNodes.get(1).getSenderStageId());
+    canColocate = canColocate && 
canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(),
+        innerLeafNodes.get(0).getSenderStageId(), 
innerLeafNodes.get(1).getSenderStageId());
     // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, 
check whether the key partitioning can
     //         allow shuffle skip.
     canColocate = canColocate && 
partitionKeyConditionForJoin(innerLeafNodes.get(0),
@@ -140,8 +138,8 @@ public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<Colocati
     canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), 
innerLeafNodes.get(1), context);
     if (canColocate) {
       // If shuffle can be skipped, reassign servers.
-      
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
-          
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstanceToWorkerIdMap());
+      
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap(
+          
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getWorkerIdToServerInstanceMap());
       _canSkipShuffleForJoin = true;
     }
 
@@ -174,13 +172,13 @@ public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<Colocati
       } else if (colocationKeyCondition(oldColocationKeys, selector) && 
areServersSuperset(node.getPlanFragmentId(),
           node.getSenderStageId())) {
         node.setDistributionType(RelDistribution.Type.SINGLETON);
-        
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
-            
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap());
+        
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setWorkerIdToServerInstanceMap(
+            
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getWorkerIdToServerInstanceMap());
         return oldColocationKeys;
       }
       // This means we can't skip shuffle and there's a partitioning enforced 
by receiver.
-      int numPartitions =
-          
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
+      int numPartitions = new HashSet<>(
+          
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size();
       List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) 
selector).getColumnIndices().stream()
           .map(x -> new ColocationKey(x, numPartitions, 
selector.hashAlgorithm())).collect(Collectors.toList());
       return new HashSet<>(colocationKeys);
@@ -196,8 +194,8 @@ public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<Colocati
       return new HashSet<>();
     }
     // This means we can't skip shuffle and there's a partitioning enforced by 
receiver.
-    int numPartitions =
-        
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
+    int numPartitions = new HashSet<>(
+        
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getWorkerIdToServerInstanceMap().values()).size();
     List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) 
selector).getColumnIndices().stream()
         .map(x -> new ColocationKey(x, numPartitions, 
selector.hashAlgorithm())).collect(Collectors.toList());
     return new HashSet<>(colocationKeys);
@@ -307,8 +305,9 @@ public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<Colocati
    * Checks if servers assigned to the receiver stage are a super-set of the 
sender stage.
    */
   private boolean areServersSuperset(int receiverStageId, int senderStageId) {
-    return 
_dispatchablePlanMetadataMap.get(receiverStageId).getServerInstanceToWorkerIdMap().keySet()
-        
.containsAll(_dispatchablePlanMetadataMap.get(senderStageId).getServerInstanceToWorkerIdMap().keySet());
+    return new HashSet<>(
+        
_dispatchablePlanMetadataMap.get(receiverStageId).getWorkerIdToServerInstanceMap().values()).containsAll(
+        
_dispatchablePlanMetadataMap.get(senderStageId).getWorkerIdToServerInstanceMap().values());
   }
 
   /*
@@ -317,15 +316,15 @@ public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<Colocati
    * 2. Servers assigned to the join-stage are a superset of S.
    */
   private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int 
leftStageId, int rightStageId) {
-    Set<QueryServerInstance> leftServerInstances = new 
HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId)
-        .getServerInstanceToWorkerIdMap().keySet());
-    Set<QueryServerInstance> rightServerInstances = 
_dispatchablePlanMetadataMap.get(rightStageId)
-        .getServerInstanceToWorkerIdMap().keySet();
-    Set<QueryServerInstance> currentServerInstances = 
_dispatchablePlanMetadataMap.get(currentStageId)
-        .getServerInstanceToWorkerIdMap().keySet();
+    Set<QueryServerInstance> leftServerInstances =
+        new 
HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId).getWorkerIdToServerInstanceMap().values());
+    Set<QueryServerInstance> rightServerInstances =
+        new 
HashSet<>(_dispatchablePlanMetadataMap.get(rightStageId).getWorkerIdToServerInstanceMap().values());
+    Set<QueryServerInstance> currentServerInstances =
+        new 
HashSet<>(_dispatchablePlanMetadataMap.get(currentStageId).getWorkerIdToServerInstanceMap().values());
     return leftServerInstances.containsAll(rightServerInstances)
-        && leftServerInstances.size() == rightServerInstances.size()
-        && currentServerInstances.containsAll(leftServerInstances);
+        && leftServerInstances.size() == rightServerInstances.size() && 
currentServerInstances.containsAll(
+        leftServerInstances);
   }
 
   /**
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 5fdb67c7c8..a57356cb33 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -73,9 +73,8 @@ public class WorkerManager {
     // ROOT stage doesn't have a QueryServer as it is strictly only reducing 
results, so here we simply assign the
     // worker instance with identical server/mailbox port number.
     DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(0);
-    metadata.setServerInstanceToWorkerIdMap(
-        Collections.singletonMap(new QueryServerInstance(_hostName, _port, 
_port), Collections.singletonList(0)));
-    metadata.setTotalWorkerCount(1);
+    metadata.setWorkerIdToServerInstanceMap(
+        Collections.singletonMap(0, new QueryServerInstance(_hostName, _port, 
_port)));
     for (PlanFragment child : rootFragment.getChildren()) {
       assignWorkersToNonRootFragment(child, context);
     }
@@ -156,21 +155,19 @@ public class WorkerManager {
 
       // attach unavailable segments to metadata
       if (!routingTable.getUnavailableSegments().isEmpty()) {
-        metadata.addTableToUnavailableSegmentsMap(tableName, 
routingTable.getUnavailableSegments());
+        metadata.addUnavailableSegments(tableName, 
routingTable.getUnavailableSegments());
       }
     }
-    int globalIdx = 0;
-    Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new 
HashMap<>();
+    int workerId = 0;
+    Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
     Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new 
HashMap<>();
     for (Map.Entry<ServerInstance, Map<String, List<String>>> entry : 
serverInstanceToSegmentsMap.entrySet()) {
-      QueryServerInstance queryServerInstance = new 
QueryServerInstance(entry.getKey());
-      serverInstanceToWorkerIdMap.put(queryServerInstance, 
Collections.singletonList(globalIdx));
-      workerIdToSegmentsMap.put(globalIdx, entry.getValue());
-      globalIdx++;
+      workerIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(entry.getKey()));
+      workerIdToSegmentsMap.put(workerId, entry.getValue());
+      workerId++;
     }
-    metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+    metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
     metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
-    metadata.setTotalWorkerCount(globalIdx);
   }
 
   /**
@@ -219,8 +216,8 @@ public class WorkerManager {
     //       segments for the same partition is colocated
     long indexToPick = context.getRequestId();
     ColocatedPartitionInfo[] partitionInfoMap = 
colocatedTableInfo._partitionInfoMap;
-    int nextWorkerId = 0;
-    Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new 
HashMap<>();
+    int workerId = 0;
+    Map<Integer, QueryServerInstance> workedIdToServerInstanceMap = new 
HashMap<>();
     Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new 
HashMap<>();
     Map<String, ServerInstance> enabledServerInstanceMap = 
_routingManager.getEnabledServerInstanceMap();
     for (int i = 0; i < numPartitions; i++) {
@@ -233,15 +230,12 @@ public class WorkerManager {
           pickEnabledServer(partitionInfo._fullyReplicatedServers, 
enabledServerInstanceMap, indexToPick++);
       Preconditions.checkState(serverInstance != null,
           "Failed to find enabled fully replicated server for table: %s, 
partition: %s in table: %s", tableName, i);
-      QueryServerInstance queryServerInstance = new 
QueryServerInstance(serverInstance);
-      int workerId = nextWorkerId++;
-      serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k -> 
new ArrayList<>()).add(workerId);
+      workedIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(serverInstance));
       workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo));
+      workerId++;
     }
-
-    metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+    metadata.setWorkerIdToServerInstanceMap(workedIdToServerInstanceMap);
     metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
-    metadata.setTotalWorkerCount(nextWorkerId);
     metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
     metadata.setPartitionedTableScan(true);
   }
@@ -260,8 +254,7 @@ public class WorkerManager {
     if (children.size() > 0) {
       DispatchablePlanMetadata firstChildMetadata = 
metadataMap.get(children.get(0).getFragmentId());
       if (firstChildMetadata.isPartitionedTableScan()) {
-        
metadata.setServerInstanceToWorkerIdMap(firstChildMetadata.getServerInstanceToWorkerIdMap());
-        metadata.setTotalWorkerCount(firstChildMetadata.getTotalWorkerCount());
+        
metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap());
         return;
       }
     }
@@ -291,22 +284,18 @@ public class WorkerManager {
     int stageParallelism = 
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
     if (metadata.isRequiresSingletonInstance()) {
       // require singleton should return a single global worker ID with 0;
-      ServerInstance serverInstance = 
serverInstances.get(RANDOM.nextInt(serverInstances.size()));
-      metadata.setServerInstanceToWorkerIdMap(
-          Collections.singletonMap(new QueryServerInstance(serverInstance), 
Collections.singletonList(0)));
-      metadata.setTotalWorkerCount(1);
+      metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
+          new 
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
     } else {
-      Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = 
new HashMap<>();
-      int nextWorkerId = 0;
+      Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
+      int workerId = 0;
       for (ServerInstance serverInstance : serverInstances) {
-        List<Integer> workerIds = new ArrayList<>();
+        QueryServerInstance queryServerInstance = new 
QueryServerInstance(serverInstance);
         for (int i = 0; i < stageParallelism; i++) {
-          workerIds.add(nextWorkerId++);
+          workerIdToServerInstanceMap.put(workerId++, queryServerInstance);
         }
-        serverInstanceToWorkerIdMap.put(new 
QueryServerInstance(serverInstance), workerIds);
       }
-      metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
-      metadata.setTotalWorkerCount(nextWorkerId);
+      metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
     }
   }
 
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 3015deea4b..e66a08a942 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -478,40 +478,40 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
     return new Object[][] {
 new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT 
col1, col3 FROM a",
   "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
-  + "├── [1]@localhost:2 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-  + "│   └── [1]@localhost:2 PROJECT\n"
-  + "│      └── [1]@localhost:2 TABLE SCAN (a) null\n"
-  + "└── [1]@localhost:1 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-  + "   └── [1]@localhost:1 PROJECT\n"
-  + "      └── [1]@localhost:1 TABLE SCAN (a) null\n"},
+  + "├── [1]@localhost:1 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "│   └── [1]@localhost:1 PROJECT\n"
+  + "│      └── [1]@localhost:1 TABLE SCAN (a) null\n"
+  + "└── [1]@localhost:2 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "   └── [1]@localhost:2 PROJECT\n"
+  + "      └── [1]@localhost:2 TABLE SCAN (a) null\n"},
 new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR "
     + "SELECT col1, COUNT(*) FROM a GROUP BY col1",
   "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
-  + "├── [1]@localhost:2 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
-  + "└── [1]@localhost:1 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-  + "   └── [1]@localhost:1 AGGREGATE_FINAL\n"
-  + "      └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-  + "         ├── [2]@localhost:2 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
-  + "         │   └── [2]@localhost:2 AGGREGATE_LEAF\n"
-  + "         │      └── [2]@localhost:2 TABLE SCAN (a) null\n"
-  + "         └── [2]@localhost:1 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
-  + "            └── [2]@localhost:1 AGGREGATE_LEAF\n"
-  + "               └── [2]@localhost:1 TABLE SCAN (a) null\n"},
+  + "├── [1]@localhost:1 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+  + "└── [1]@localhost:2 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "   └── [1]@localhost:2 AGGREGATE_FINAL\n"
+  + "      └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+  + "         ├── [2]@localhost:1 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+  + "         │   └── [2]@localhost:1 AGGREGATE_LEAF\n"
+  + "         │      └── [2]@localhost:1 TABLE SCAN (a) null\n"
+  + "         └── [2]@localhost:2 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+  + "            └── [2]@localhost:2 AGGREGATE_LEAF\n"
+  + "               └── [2]@localhost:2 TABLE SCAN (a) null\n"},
 new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a 
JOIN b ON a.col1 = b.col1",
   "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
-  + "├── [1]@localhost:2 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
-  + "└── [1]@localhost:1 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-  + "   └── [1]@localhost:1 PROJECT\n"
-  + "      └── [1]@localhost:1 JOIN\n"
-  + "         ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-  + "         │   ├── [2]@localhost:2 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
-  + "         │   │   └── [2]@localhost:2 PROJECT\n"
-  + "         │   │      └── [2]@localhost:2 TABLE SCAN (a) null\n"
-  + "         │   └── [2]@localhost:1 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
-  + "         │      └── [2]@localhost:1 PROJECT\n"
-  + "         │         └── [2]@localhost:1 TABLE SCAN (a) null\n"
-  + "         └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-  + "            └── [3]@localhost:1 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+  + "├── [1]@localhost:1 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+  + "└── [1]@localhost:2 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "   └── [1]@localhost:2 PROJECT\n"
+  + "      └── [1]@localhost:2 JOIN\n"
+  + "         ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+  + "         │   ├── [2]@localhost:1 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+  + "         │   │   └── [2]@localhost:1 PROJECT\n"
+  + "         │   │      └── [2]@localhost:1 TABLE SCAN (a) null\n"
+  + "         │   └── [2]@localhost:2 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+  + "         │      └── [2]@localhost:2 PROJECT\n"
+  + "         │         └── [2]@localhost:2 TABLE SCAN (a) null\n"
+  + "         └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+  + "            └── [3]@localhost:1 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
   + "               └── [3]@localhost:1 PROJECT\n"
   + "                  └── [3]@localhost:1 TABLE SCAN (b) null\n"}
     };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to