walterddr commented on code in PR #11912:
URL: https://github.com/apache/pinot/pull/11912#discussion_r1377683135


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java:
##########
@@ -126,49 +124,65 @@ public PlanNode visitExchange(ExchangeNode node, Context 
context) {
     if (!isPlanFragmentSplitter(node)) {
       return process(node, context);
     }
-    int currentPlanFragmentId = context._previousPlanFragmentId;
-    int nextPlanFragmentId = ++context._currentPlanFragmentId;
-    // Set previous PlanFragment ID in the context to be the next PlanFragment 
ID to be used by the child node.
-    context._previousPlanFragmentId = nextPlanFragmentId;
-    PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, 
context);
 
+    // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, 
where MailboxReceiveNode is the leave node
+    // of the current PlanFragment, and MailboxSendNode is the root node of 
the next PlanFragment.
+    int receiverPlanFragmentId = context._currentPlanFragmentId;
+    int senderPlanFragmentId = context._nextPlanFragmentId.getAndIncrement();

Review Comment:
   nit: visitor pattern is single threaded so i dont think we need to have 
atomic integer here. 
   
   this "getAndIncrement" is still confusing --> if we are handling one context 
per visit upon hitting an exchange then this is the only variable that's cross 
all fragments: IIUC, the only reason this works is b/c the nextPlanFragmentId 
is passed by reference so it is not really contain within the context?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java:
##########
@@ -126,49 +124,65 @@ public PlanNode visitExchange(ExchangeNode node, Context 
context) {
     if (!isPlanFragmentSplitter(node)) {
       return process(node, context);
     }
-    int currentPlanFragmentId = context._previousPlanFragmentId;
-    int nextPlanFragmentId = ++context._currentPlanFragmentId;
-    // Set previous PlanFragment ID in the context to be the next PlanFragment 
ID to be used by the child node.
-    context._previousPlanFragmentId = nextPlanFragmentId;
-    PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, 
context);
 
+    // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, 
where MailboxReceiveNode is the leave node
+    // of the current PlanFragment, and MailboxSendNode is the root node of 
the next PlanFragment.
+    int receiverPlanFragmentId = context._currentPlanFragmentId;
+    int senderPlanFragmentId = context._nextPlanFragmentId.getAndIncrement();
+    
context._planFragmentIdToChildrenMap.computeIfAbsent(receiverPlanFragmentId, k 
-> new ArrayList<>())
+        .add(senderPlanFragmentId);
+
+    // Create a new context for the next PlanFragment with the new 
PlanFragment ID.
+    Context nextPlanFragmentContext =
+        new Context(senderPlanFragmentId, context._nextPlanFragmentId, 
context._planFragmentIdToRootNodeMap,
+            context._planFragmentIdToChildrenMap);
+    PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, 
nextPlanFragmentContext);
+
+    // Create a MailboxSendNode as the root node of the next PlanFragment.
     PinotRelExchangeType exchangeType = node.getExchangeType();
     RelDistribution.Type distributionType = node.getDistributionType();
     // NOTE: Only HASH_DISTRIBUTED requires distribution keys
     // TODO: Revisit ExchangeNode creation logic to avoid using 
HASH_DISTRIBUTED with empty distribution keys
     List<Integer> distributionKeys =
         distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? 
node.getDistributionKeys() : null;
-
-    PlanNode mailboxSender =
-        new MailboxSendNode(nextPlanFragmentId, 
nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId,
+    MailboxSendNode mailboxSendNode =
+        new MailboxSendNode(senderPlanFragmentId, 
nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
             distributionType, exchangeType, distributionKeys, 
node.getCollations(), node.isSortOnSender());
-    PlanNode mailboxReceiver =
-        new MailboxReceiveNode(currentPlanFragmentId, 
nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId,
-            distributionType, exchangeType, distributionKeys, 
node.getCollations(), node.isSortOnSender(),
-            node.isSortOnReceiver(), mailboxSender);
-    mailboxSender.addInput(nextPlanFragmentRoot);
-
-    context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId,
-        new PlanFragment(nextPlanFragmentId, mailboxSender, new 
PlanFragmentMetadata(), new ArrayList<>()));
-    if 
(!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) {
-      context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new 
ArrayList<>());
-    }
-    
context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId);
-
-    return mailboxReceiver;
+    mailboxSendNode.addInput(nextPlanFragmentRoot);
+    context._planFragmentIdToRootNodeMap.put(senderPlanFragmentId,
+        new PlanFragment(senderPlanFragmentId, mailboxSendNode, new 
PlanFragmentMetadata(), new ArrayList<>()));
+
+    // Return the MailboxReceiveNode as the leave node of the current 
PlanFragment.
+    return new MailboxReceiveNode(receiverPlanFragmentId, 
nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId,
+        distributionType, exchangeType, distributionKeys, 
node.getCollations(), node.isSortOnSender(),
+        node.isSortOnReceiver(), mailboxSendNode);
   }
 
   private boolean isPlanFragmentSplitter(PlanNode node) {
     return ((ExchangeNode) node).getExchangeType() != 
PinotRelExchangeType.SUB_PLAN;
   }
 
   public static class Context {
+    final int _currentPlanFragmentId;
+    final AtomicInteger _nextPlanFragmentId;
+    final Map<Integer, PlanFragment> _planFragmentIdToRootNodeMap;
+    final Map<Integer, List<Integer>> _planFragmentIdToChildrenMap;
+
+    public Context() {

Review Comment:
   why do we need this public constructor? no one is using it right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to