This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7761e6fa8a5 Sort servers in WorkerManager to ensure deterministic 
workerId <-> server mapping across stages in an MSE query (#17342)
7761e6fa8a5 is described below

commit 7761e6fa8a584e363b239ad9572b3e81c5143fba
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Dec 15 13:06:13 2025 -0800

    Sort servers in WorkerManager to ensure deterministic workerId <-> server 
mapping across stages in an MSE query (#17342)
---
 .../apache/pinot/query/routing/WorkerManager.java  |  65 ++-
 .../resources/queries/ExplainPhysicalPlans.json    | 564 +++++++++++----------
 2 files changed, 333 insertions(+), 296 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 3d379896d75..34867ef2aff 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -114,7 +115,6 @@ public class WorkerManager {
     }
   }
 
-  // TODO: Ensure that workerId to server assignment is deterministic across 
all stages in a query.
   private void assignWorkersToNonRootFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
     List<PlanFragment> children = fragment.getChildren();
     for (PlanFragment child : children) {
@@ -257,6 +257,10 @@ public class WorkerManager {
     List<QueryServerInstance> candidateServers = null;
     if (workerIdToServerInstanceMap == null) {
       candidateServers = getCandidateServers(context);
+      // Sort to ensure deterministic worker ID assignment across stages.
+      // This is critical for pre-partitioned exchanges where worker ID N on 
one stage should to the same physical
+      // server as worker ID N on another stage.
+      
candidateServers.sort(Comparator.comparing(QueryServerInstance::getInstanceId));
       int stageParallelism = Integer.parseInt(
           
context.getPlannerContext().getOptions().getOrDefault(QueryOptionKey.STAGE_PARALLELISM,
 "1"));
       workerIdToServerInstanceMap = 
Maps.newHashMapWithExpectedSize(candidateServers.size() * stageParallelism);
@@ -502,14 +506,28 @@ public class WorkerManager {
         metadata.addUnavailableSegments(tableName, 
routingTable.getUnavailableSegments());
       }
     }
-    int workerId = 0;
-    Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
-    Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new 
HashMap<>();
-    for (Map.Entry<ServerInstance, Map<String, List<String>>> entry : 
serverInstanceToSegmentsMap.entrySet()) {
-      workerIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(entry.getKey()));
-      workerIdToSegmentsMap.put(workerId, entry.getValue());
-      workerId++;
+    // Sort server instances to ensure deterministic worker ID assignment.
+    // This is critical for pre-partitioned exchanges where worker ID N on one 
stage
+    // must map to the same physical server as worker ID N on another stage.
+    List<Map.Entry<ServerInstance, Map<String, List<String>>>> 
sortedServerInstanceToSegmentsMap =
+        new ArrayList<>(serverInstanceToSegmentsMap.entrySet());
+    sortedServerInstanceToSegmentsMap.sort(Comparator.comparing(entry -> 
entry.getKey().getInstanceId()));
+
+    // Assign 1 worker per server
+    int numWorkers = sortedServerInstanceToSegmentsMap.size();
+    Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = 
Maps.newHashMapWithExpectedSize(numWorkers);
+    Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = 
Maps.newHashMapWithExpectedSize(numWorkers);
+
+    for (int workerId = 0; workerId < numWorkers; workerId++) {
+      Map.Entry<ServerInstance, Map<String, List<String>>> serverEntry =
+          sortedServerInstanceToSegmentsMap.get(workerId);
+      QueryServerInstance server = new 
QueryServerInstance(serverEntry.getKey());
+      Map<String, List<String>> segmentsMap = serverEntry.getValue();
+
+      workerIdToServerInstanceMap.put(workerId, server);
+      workerIdToSegmentsMap.put(workerId, segmentsMap);
     }
+
     metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
     metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
   }
@@ -657,14 +675,27 @@ public class WorkerManager {
       }
     }
 
-    int workerId = 0;
-    Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
-    Map<Integer, Map<String, List<String>>> workerIdToLogicalTableSegmentsMap 
= new HashMap<>();
-    for (Map.Entry<ServerInstance, Map<String, List<String>>> entry
-        : serverInstanceToLogicalSegmentsMap.entrySet()) {
-      workerIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(entry.getKey()));
-      workerIdToLogicalTableSegmentsMap.put(workerId, entry.getValue());
-      workerId++;
+    // Sort server instances to ensure deterministic worker ID assignment.
+    // This is critical for pre-partitioned exchanges where worker ID N on one 
stage
+    // must map to the same physical server as worker ID N on another stage.
+    List<Map.Entry<ServerInstance, Map<String, List<String>>>> 
sortedServerInstanceToSegmentsMap =
+        new ArrayList<>(serverInstanceToLogicalSegmentsMap.entrySet());
+    sortedServerInstanceToSegmentsMap.sort(Comparator.comparing(entry -> 
entry.getKey().getInstanceId()));
+
+    // Assign 1 worker per server
+    int numWorkers = sortedServerInstanceToSegmentsMap.size();
+    Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = 
Maps.newHashMapWithExpectedSize(numWorkers);
+    Map<Integer, Map<String, List<String>>> workerIdToLogicalTableSegmentsMap =
+        Maps.newHashMapWithExpectedSize(numWorkers);
+
+    for (int workerId = 0; workerId < numWorkers; workerId++) {
+      Map.Entry<ServerInstance, Map<String, List<String>>> serverEntry =
+          sortedServerInstanceToSegmentsMap.get(workerId);
+      QueryServerInstance server = new 
QueryServerInstance(serverEntry.getKey());
+      Map<String, List<String>> segmentsMap = serverEntry.getValue();
+
+      workerIdToServerInstanceMap.put(workerId, server);
+      workerIdToLogicalTableSegmentsMap.put(workerId, segmentsMap);
     }
 
     metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
@@ -966,7 +997,7 @@ public class WorkerManager {
       if 
(!tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty())
 {
         throw new IllegalStateException(
             "Find " + 
tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().size()
-            + " segments with invalid partition");
+                + " segments with invalid partition");
       }
 
       int numPartitions = 
tablePartitionReplicatedServersInfo.getNumPartitions();
diff --git 
a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json 
b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
index 382196cb967..cb0c3129b9c 100644
--- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -6,10 +6,10 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR 
SELECT col1, col3 FROM a",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] TABLE SCAN (a) null\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] TABLE SCAN (a) null\n",
           ""
         ]
       },
@@ -18,14 +18,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT 
col1, COUNT(*) FROM a GROUP BY col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
-          "                └── [2]@localhost:2|[0] AGGREGATE_LEAF\n",
-          "                    └── [2]@localhost:2|[0] TABLE SCAN (a) null\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
+          "                └── [2]@localhost:1|[0] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[0] TABLE SCAN (a) null\n",
           ""
         ]
       },
@@ -34,17 +34,17 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a 
JOIN b ON a.col1 = b.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] PROJECT\n",
-          "            │           └── [2]@localhost:2|[0] TABLE SCAN (a) 
null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
+          "            │       └── [2]@localhost:1|[0] PROJECT\n",
+          "            │           └── [2]@localhost:1|[0] TABLE SCAN (a) 
null\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                    └── [3]@localhost:1|[0] PROJECT\n",
           "                        └── [3]@localhost:1|[0] TABLE SCAN (b) 
null\n",
           ""
@@ -58,22 +58,22 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 
FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ JOIN b /*+ tableOptions(partition_function='hashcode', 
partition_key='col1', partition_size='4') */ ON a.col1 = b.col2 WHERE b.col3 > 
0",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "            │       └── [2]@localhost:1|[1] PROJECT\n",
           "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) 
null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                    └── [3]@localhost:1|[1] PROJECT\n",
           "                        └── [3]@localhost:1|[1] FILTER\n",
           "                            └── [3]@localhost:1|[1] TABLE SCAN (b) 
null\n",
@@ -85,22 +85,22 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 
FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ JOIN b /*+ tableOptions(partition_function='hashcode', 
partition_key='col1', partition_size='4') */ ON a.col1 = b.col1 WHERE b.col3 > 
0",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "            │       └── [2]@localhost:1|[1] PROJECT\n",
           "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) 
null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0],[1]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0],[1]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0],[1]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                    └── [3]@localhost:1|[1] PROJECT\n",
           "                        └── [3]@localhost:1|[1] FILTER\n",
           "                            └── [3]@localhost:1|[1] TABLE SCAN (b) 
null\n",
@@ -170,14 +170,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, COUNT(*) FROM a 
/*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col2 IN (SELECT b.col1 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */WHERE b.col1 = 'z') GROUP BY a.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] JOIN\n",
           "                        ├── [2]@localhost:1|[1] PROJECT\n",
@@ -189,7 +189,8 @@
           "                            └── [3]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 
3]}\n",
           "                                └── [3]@localhost:1|[1] PROJECT\n",
           "                                    └── [3]@localhost:1|[1] 
FILTER\n",
-          "                                        └── [3]@localhost:1|[1] 
TABLE SCAN (b) null\n"
+          "                                        └── [3]@localhost:1|[1] 
TABLE SCAN (b) null\n",
+          ""
         ]
       },
       {
@@ -197,14 +198,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT /*+ 
joinOptions(is_colocated_by_join_keys='true') */ a.col1, COUNT(*) FROM a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col2 IN (SELECT b.col1 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */WHERE b.col1 = 'z') GROUP BY a.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] JOIN\n",
           "                        ├── [2]@localhost:1|[1] PROJECT\n",
@@ -216,7 +217,8 @@
           "                            └── [3]@localhost:1|[1] 
MAIL_SEND(SINGLETON)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
           "                                └── [3]@localhost:1|[1] PROJECT\n",
           "                                    └── [3]@localhost:1|[1] 
FILTER\n",
-          "                                        └── [3]@localhost:1|[1] 
TABLE SCAN (b) null\n"
+          "                                        └── [3]@localhost:1|[1] 
TABLE SCAN (b) null\n",
+          ""
         ]
       },
       {
@@ -224,14 +226,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT /*+ 
joinOptions(is_colocated_by_join_keys='true') */ a.col1, COUNT(*) FROM a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col1 IN (SELECT b.col2 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */WHERE b.col1 = 'z') GROUP BY a.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] JOIN\n",
           "                        ├── [2]@localhost:1|[1] PROJECT\n",
@@ -243,7 +245,8 @@
           "                            └── [3]@localhost:1|[1] 
MAIL_SEND(SINGLETON)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
           "                                └── [3]@localhost:1|[1] PROJECT\n",
           "                                    └── [3]@localhost:1|[1] 
FILTER\n",
-          "                                        └── [3]@localhost:1|[1] 
TABLE SCAN (b) null\n"
+          "                                        └── [3]@localhost:1|[1] 
TABLE SCAN (b) null\n",
+          ""
         ]
       },
       {
@@ -271,14 +274,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3) 
FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */  GROUP BY a.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
           ""
@@ -290,14 +293,14 @@
         "comments": "TODO: currently we do not support minimum hash-set thus 
we repartition from col2 to (col1, col2), but once we support this should not 
have repartitioning.",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
           ""
@@ -343,14 +346,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3)  
FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */  JOIN b /*+ tableOptions(partition_function='hashcode', 
partition_key='col1', partition_size='4') */    ON a.col2 = b.col1  WHERE 
b.col3 > 0  GROUP BY a.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] JOIN\n",
           "                        ├── [2]@localhost:1|[1] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
@@ -391,16 +394,16 @@
           "                        ├── [2]@localhost:1|[1] PROJECT\n",
           "                        │   └── [2]@localhost:1|[1] TABLE SCAN (a) 
null\n",
           "                        └── [2]@localhost:1|[1] 
MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "                            ├── [3]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 
3]} (Subtree Omitted)\n",
-          "                            └── [3]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 
3]}\n",
-          "                                └── [3]@localhost:2|[0] PROJECT\n",
-          "                                    └── [3]@localhost:2|[0] 
FILTER\n",
-          "                                        └── [3]@localhost:2|[0] 
AGGREGATE_FINAL\n",
-          "                                            └── [3]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                                                ├── 
[4]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                                                ├── 
[4]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                                                ├── 
[4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                                                └── 
[4]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]}\n",
+          "                            ├── [3]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 
3]} (Subtree Omitted)\n",
+          "                            └── [3]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 
3]}\n",
+          "                                └── [3]@localhost:1|[0] PROJECT\n",
+          "                                    └── [3]@localhost:1|[0] 
FILTER\n",
+          "                                        └── [3]@localhost:1|[0] 
AGGREGATE_FINAL\n",
+          "                                            └── [3]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                                                ├── 
[4]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[0],[3]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                                                ├── 
[4]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[0],[3]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                                                ├── 
[4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[0],[3]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                                                └── 
[4]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[0],[3]@localhost:2|[1]}\n",
           "                                                    └── 
[4]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                                                        └── 
[4]@localhost:1|[1] FILTER\n",
           "                                                            └── 
[4]@localhost:1|[1] TABLE SCAN (b) null\n",
@@ -412,14 +415,14 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3)  
FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */  WHERE a.col2 IN (   SELECT col1    FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */    WHERE b.col3 > 0    GROUP BY col1 HAVING COUNT(*) > 1 
)  GROUP BY a.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
           "                    └── [2]@localhost:1|[1] JOIN\n",
           "                        ├── [2]@localhost:1|[1] PROJECT\n",
@@ -448,26 +451,26 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT * FROM ( SELECT a.col2 
AS col2 FROM a GROUP BY a.col2 HAVING COUNT(*) > 10 ) tmpA JOIN (SELECT b.col1 
AS col1, SUM(b.col3) AS val FROM b GROUP BY b.col1 ) tmpB ON tmpA.col2 = 
tmpB.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] JOIN\n",
-          "        ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "        │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "        │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "        │       └── [2]@localhost:2|[0] PROJECT\n",
-          "        │           └── [2]@localhost:2|[0] FILTER\n",
-          "        │               └── [2]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "        │                   └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "        │                       ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "        │                       └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "        │                           └── [3]@localhost:2|[0] 
AGGREGATE_LEAF\n",
-          "        │                               └── [3]@localhost:2|[0] 
TABLE SCAN (a) null\n",
-          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [4]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "            └── [4]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "                └── [4]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "                    └── [4]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                        └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[1],[4]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] JOIN\n",
+          "        ├── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "        │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "        │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "        │       └── [2]@localhost:1|[0] PROJECT\n",
+          "        │           └── [2]@localhost:1|[0] FILTER\n",
+          "        │               └── [2]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "        │                   └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "        │                       ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "        │                       └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "        │                           └── [3]@localhost:1|[0] 
AGGREGATE_LEAF\n",
+          "        │                               └── [3]@localhost:1|[0] 
TABLE SCAN (a) null\n",
+          "        └── [1]@localhost:1|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [4]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "                └── [4]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "                    └── [4]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                        └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[0],[4]@localhost:2|[1]}\n",
           "                            └── [5]@localhost:1|[0] 
AGGREGATE_LEAF\n",
           "                                └── [5]@localhost:1|[0] TABLE SCAN 
(b) null\n",
           ""
@@ -478,27 +481,27 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT col2, SUM(val) FROM ( 
SELECT a.col2 AS col2 FROM a GROUP BY a.col2 HAVING COUNT(*) > 10 ) tmpA JOIN 
(SELECT b.col1 AS col1, SUM(b.col3) AS val FROM b GROUP BY b.col1 ) tmpB ON 
tmpA.col2 = tmpB.col1 GROUP BY col2",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] PROJECT\n",
-          "            │           └── [2]@localhost:2|[0] FILTER\n",
-          "            │               └── [2]@localhost:2|[0] 
AGGREGATE_FINAL\n",
-          "            │                   └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │                       ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │                       └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "            │                           └── [3]@localhost:2|[0] 
AGGREGATE_LEAF\n",
-          "            │                               └── [3]@localhost:2|[0] 
TABLE SCAN (a) null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [4]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "                └── [4]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "                    └── [4]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "                        └── [4]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                            └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[1],[4]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "            │       └── [2]@localhost:1|[0] PROJECT\n",
+          "            │           └── [2]@localhost:1|[0] FILTER\n",
+          "            │               └── [2]@localhost:1|[0] 
AGGREGATE_FINAL\n",
+          "            │                   └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │                       ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │                       └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "            │                           └── [3]@localhost:1|[0] 
AGGREGATE_LEAF\n",
+          "            │                               └── [3]@localhost:1|[0] 
TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [4]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "                    └── [4]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "                        └── [4]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                            └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[0],[4]@localhost:2|[1]}\n",
           "                                └── [5]@localhost:1|[0] 
AGGREGATE_LEAF\n",
           "                                    └── [5]@localhost:1|[0] TABLE 
SCAN (b) null\n",
           ""
@@ -536,24 +539,25 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM 
(select col3 as col2, col3 + col2 as mySum from a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */) as a JOIN b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ ON a.col2 = b.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "            │       └── [2]@localhost:1|[1] PROJECT\n",
           "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) 
null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
           "                    └── [3]@localhost:1|[1] PROJECT\n",
-          "                        └── [3]@localhost:1|[1] TABLE SCAN (b) 
null\n"
+          "                        └── [3]@localhost:1|[1] TABLE SCAN (b) 
null\n",
+          ""
         ]
       },
       {
@@ -561,29 +565,30 @@
         "sql": "SET useSpools=true; EXPLAIN IMPLEMENTATION PLAN FOR SELECT 1 
FROM a as a1 JOIN b ON a1.col1 = b.col1 JOIN a as a2 ON a2.col1 = b.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] PROJECT\n",
-          "            │           └── [2]@localhost:2|[0] JOIN\n",
-          "            │               ├── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │               │   ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "            │               │   └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "            │               │       └── [3]@localhost:2|[0] 
PROJECT\n",
-          "            │               │           └── [3]@localhost:2|[0] 
TABLE SCAN (a) null\n",
-          "            │               └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │                   └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
+          "            │       └── [2]@localhost:1|[0] PROJECT\n",
+          "            │           └── [2]@localhost:1|[0] JOIN\n",
+          "            │               ├── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │               │   ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "            │               │   └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "            │               │       └── [3]@localhost:1|[0] 
PROJECT\n",
+          "            │               │           └── [3]@localhost:1|[0] 
TABLE SCAN (a) null\n",
+          "            │               └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │                   └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
           "            │                       └── [4]@localhost:1|[0] 
PROJECT\n",
           "            │                           └── [4]@localhost:1|[0] 
TABLE SCAN (b) null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "                └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "                    └── [3]@localhost:2|[0] PROJECT\n",
-          "                        └── [3]@localhost:2|[0] TABLE SCAN (a) 
null\n"
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "                └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "                    └── [3]@localhost:1|[0] PROJECT\n",
+          "                        └── [3]@localhost:1|[0] TABLE SCAN (a) 
null\n",
+          ""
         ]
       },
       {
@@ -591,61 +596,62 @@
         "sql": "SET useSpools=true; EXPLAIN IMPLEMENTATION PLAN FOR WITH 
mySpool AS (select * from a) SELECT 1 FROM mySpool as a1 JOIN b ON a1.col1 = 
b.col1 JOIN mySpool as a2 ON a2.col1 = b.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] PROJECT\n",
-          "            │           └── [2]@localhost:2|[0] JOIN\n",
-          "            │               ├── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │               │   ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "            │               │   └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "            │               │       └── [3]@localhost:2|[0] 
PROJECT\n",
-          "            │               │           └── [3]@localhost:2|[0] 
TABLE SCAN (a) null\n",
-          "            │               └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │                   └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
+          "            │       └── [2]@localhost:1|[0] PROJECT\n",
+          "            │           └── [2]@localhost:1|[0] JOIN\n",
+          "            │               ├── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │               │   ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "            │               │   └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "            │               │       └── [3]@localhost:1|[0] 
PROJECT\n",
+          "            │               │           └── [3]@localhost:1|[0] 
TABLE SCAN (a) null\n",
+          "            │               └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │                   └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
           "            │                       └── [4]@localhost:1|[0] 
PROJECT\n",
           "            │                           └── [4]@localhost:1|[0] 
TABLE SCAN (b) null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}
 (Subtree Omitted)\n",
-          "                └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "                    └── [3]@localhost:2|[0] PROJECT\n",
-          "                        └── [3]@localhost:2|[0] TABLE SCAN (a) 
null\n"
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}
 (Subtree Omitted)\n",
+          "                └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1],[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "                    └── [3]@localhost:1|[0] PROJECT\n",
+          "                        └── [3]@localhost:1|[0] TABLE SCAN (a) 
null\n",
+          ""
         ]
       },
-
       {
         "description": "explain plan with spool on CTE with extra filters",
         "sql": "SET useSpools=true; EXPLAIN IMPLEMENTATION PLAN FOR WITH 
mySpool AS (select * from a) SELECT 1 FROM mySpool as a1 JOIN b ON a1.col1 = 
b.col1 JOIN mySpool as a2 ON a2.col1 = b.col1 where a2.col2 > 0",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] PROJECT\n",
-          "            │           └── [2]@localhost:2|[0] JOIN\n",
-          "            │               ├── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │               │   ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │               │   └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "            │               │       └── [3]@localhost:2|[0] 
PROJECT\n",
-          "            │               │           └── [3]@localhost:2|[0] 
TABLE SCAN (a) null\n",
-          "            │               └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │                   └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
+          "            │       └── [2]@localhost:1|[0] PROJECT\n",
+          "            │           └── [2]@localhost:1|[0] JOIN\n",
+          "            │               ├── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │               │   ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │               │   └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "            │               │       └── [3]@localhost:1|[0] 
PROJECT\n",
+          "            │               │           └── [3]@localhost:1|[0] 
TABLE SCAN (a) null\n",
+          "            │               └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │                   └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
           "            │                       └── [4]@localhost:1|[0] 
PROJECT\n",
           "            │                           └── [4]@localhost:1|[0] 
TABLE SCAN (b) null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [5]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "                └── [5]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
-          "                    └── [5]@localhost:2|[0] PROJECT\n",
-          "                        └── [5]@localhost:2|[0] FILTER\n",
-          "                            └── [5]@localhost:2|[0] TABLE SCAN (a) 
null\n"
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [5]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[0],[1]@localhost:2|[1]}\n",
+          "                    └── [5]@localhost:1|[0] PROJECT\n",
+          "                        └── [5]@localhost:1|[0] FILTER\n",
+          "                            └── [5]@localhost:1|[0] TABLE SCAN (a) 
null\n",
+          ""
         ]
       },
       {
@@ -654,25 +660,25 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT tmpA.col1, tmpA.cnt, 
tmpB.total FROM (SELECT a.col1, COUNT(*) AS cnt FROM a GROUP BY a.col1) tmpA 
JOIN (SELECT b.col1, SUM(b.col3) AS total FROM b GROUP BY b.col1) tmpB ON 
tmpA.col1 = tmpB.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "            │           └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │               ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │               └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "            │                   └── [3]@localhost:2|[0] 
AGGREGATE_LEAF\n",
-          "            │                       └── [3]@localhost:2|[0] TABLE 
SCAN (a) null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [4]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "                └── [4]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "                    └── [4]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "                        └── [4]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                            └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[1],[4]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "            │       └── [2]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "            │           └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │               ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │               └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "            │                   └── [3]@localhost:1|[0] 
AGGREGATE_LEAF\n",
+          "            │                       └── [3]@localhost:1|[0] TABLE 
SCAN (a) null\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [4]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "                    └── [4]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "                        └── [4]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                            └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[0],[4]@localhost:2|[1]}\n",
           "                                └── [5]@localhost:1|[0] 
AGGREGATE_LEAF\n",
           "                                    └── [5]@localhost:1|[0] TABLE 
SCAN (b) null\n",
           ""
@@ -684,25 +690,25 @@
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT tmpA.col2, tmpA.cnt, 
tmpB.total FROM (SELECT a.col2, COUNT(*) AS cnt FROM a GROUP BY a.col2) tmpA 
JOIN (SELECT b.col1, SUM(b.col3) AS total FROM b GROUP BY b.col1) tmpB ON 
tmpA.col2 = tmpB.col1",
         "output": [
           "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
-          "    └── [1]@localhost:2|[0] PROJECT\n",
-          "        └── [1]@localhost:2|[0] JOIN\n",
-          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "            │   └── [2]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "            │       └── [2]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "            │           └── [2]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │               ├── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree 
Omitted)\n",
-          "            │               └── [3]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n",
-          "            │                   └── [3]@localhost:2|[0] 
AGGREGATE_LEAF\n",
-          "            │                       └── [3]@localhost:2|[0] TABLE 
SCAN (a) null\n",
-          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [4]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]} (Subtree 
Omitted)\n",
-          "                └── [4]@localhost:2|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[0]}\n",
-          "                    └── [4]@localhost:2|[0] AGGREGATE_FINAL\n",
-          "                        └── [4]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                            └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[1],[4]@localhost:2|[0]}\n",
+          "├── [1]@localhost:2|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[0] PROJECT\n",
+          "        └── [1]@localhost:1|[0] JOIN\n",
+          "            ├── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "            │       └── [2]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "            │           └── [2]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │               ├── [3]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "            │               └── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[0],[2]@localhost:2|[1]}\n",
+          "            │                   └── [3]@localhost:1|[0] 
AGGREGATE_LEAF\n",
+          "            │                       └── [3]@localhost:1|[0] TABLE 
SCAN (a) null\n",
+          "            └── [1]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [4]@localhost:2|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[1]} (Subtree 
Omitted)\n",
+          "                └── [4]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]}\n",
+          "                    └── [4]@localhost:1|[0] AGGREGATE_FINAL\n",
+          "                        └── [4]@localhost:1|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                            └── [5]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[4]@localhost:1|[0],[4]@localhost:2|[1]}\n",
           "                                └── [5]@localhost:1|[0] 
AGGREGATE_LEAF\n",
           "                                    └── [5]@localhost:1|[0] TABLE 
SCAN (b) null\n",
           ""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to