Jackie-Jiang commented on code in PR #11912: URL: https://github.com/apache/pinot/pull/11912#discussion_r1378006473
########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java: ########## @@ -126,49 +124,65 @@ public PlanNode visitExchange(ExchangeNode node, Context context) { if (!isPlanFragmentSplitter(node)) { return process(node, context); } - int currentPlanFragmentId = context._previousPlanFragmentId; - int nextPlanFragmentId = ++context._currentPlanFragmentId; - // Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node. - context._previousPlanFragmentId = nextPlanFragmentId; - PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context); + // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node + // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment. + int receiverPlanFragmentId = context._currentPlanFragmentId; + int senderPlanFragmentId = context._nextPlanFragmentId.getAndIncrement(); + context._planFragmentIdToChildrenMap.computeIfAbsent(receiverPlanFragmentId, k -> new ArrayList<>()) + .add(senderPlanFragmentId); + + // Create a new context for the next PlanFragment with the new PlanFragment ID. + Context nextPlanFragmentContext = + new Context(senderPlanFragmentId, context._nextPlanFragmentId, context._planFragmentIdToRootNodeMap, + context._planFragmentIdToChildrenMap); + PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, nextPlanFragmentContext); + + // Create a MailboxSendNode as the root node of the next PlanFragment. PinotRelExchangeType exchangeType = node.getExchangeType(); RelDistribution.Type distributionType = node.getDistributionType(); // NOTE: Only HASH_DISTRIBUTED requires distribution keys // TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys List<Integer> distributionKeys = distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null; - - PlanNode mailboxSender = - new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId, + MailboxSendNode mailboxSendNode = + new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId, distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender()); - PlanNode mailboxReceiver = - new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId, - distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), - node.isSortOnReceiver(), mailboxSender); - mailboxSender.addInput(nextPlanFragmentRoot); - - context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId, - new PlanFragment(nextPlanFragmentId, mailboxSender, new PlanFragmentMetadata(), new ArrayList<>())); - if (!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) { - context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new ArrayList<>()); - } - context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId); - - return mailboxReceiver; + mailboxSendNode.addInput(nextPlanFragmentRoot); + context._planFragmentIdToRootNodeMap.put(senderPlanFragmentId, + new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>())); + + // Return the MailboxReceiveNode as the leave node of the current PlanFragment. + return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId, + distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), + node.isSortOnReceiver(), mailboxSendNode); } private boolean isPlanFragmentSplitter(PlanNode node) { return ((ExchangeNode) node).getExchangeType() != PinotRelExchangeType.SUB_PLAN; } public static class Context { + final int _currentPlanFragmentId; + final AtomicInteger _nextPlanFragmentId; + final Map<Integer, PlanFragment> _planFragmentIdToRootNodeMap; + final Map<Integer, List<Integer>> _planFragmentIdToChildrenMap; + + public Context() { Review Comment: It is used in `PinotLogicalQueryPlanner` to create the initial `Context` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org