walterddr commented on code in PR #11912: URL: https://github.com/apache/pinot/pull/11912#discussion_r1377683135
########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java: ########## @@ -126,49 +124,65 @@ public PlanNode visitExchange(ExchangeNode node, Context context) { if (!isPlanFragmentSplitter(node)) { return process(node, context); } - int currentPlanFragmentId = context._previousPlanFragmentId; - int nextPlanFragmentId = ++context._currentPlanFragmentId; - // Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node. - context._previousPlanFragmentId = nextPlanFragmentId; - PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context); + // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node + // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment. + int receiverPlanFragmentId = context._currentPlanFragmentId; + int senderPlanFragmentId = context._nextPlanFragmentId.getAndIncrement(); Review Comment: nit: visitor pattern is single threaded so i dont think we need to have atomic integer here. this "getAndIncrement" is still confusing --> if we are handling one context per visit upon hitting an exchange then this is the only variable that's cross all fragments: IIUC, the only reason this works is b/c the nextPlanFragmentId is passed by reference so it is not really contain within the context? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java: ########## @@ -126,49 +124,65 @@ public PlanNode visitExchange(ExchangeNode node, Context context) { if (!isPlanFragmentSplitter(node)) { return process(node, context); } - int currentPlanFragmentId = context._previousPlanFragmentId; - int nextPlanFragmentId = ++context._currentPlanFragmentId; - // Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node. - context._previousPlanFragmentId = nextPlanFragmentId; - PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context); + // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node + // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment. + int receiverPlanFragmentId = context._currentPlanFragmentId; + int senderPlanFragmentId = context._nextPlanFragmentId.getAndIncrement(); + context._planFragmentIdToChildrenMap.computeIfAbsent(receiverPlanFragmentId, k -> new ArrayList<>()) + .add(senderPlanFragmentId); + + // Create a new context for the next PlanFragment with the new PlanFragment ID. + Context nextPlanFragmentContext = + new Context(senderPlanFragmentId, context._nextPlanFragmentId, context._planFragmentIdToRootNodeMap, + context._planFragmentIdToChildrenMap); + PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, nextPlanFragmentContext); + + // Create a MailboxSendNode as the root node of the next PlanFragment. PinotRelExchangeType exchangeType = node.getExchangeType(); RelDistribution.Type distributionType = node.getDistributionType(); // NOTE: Only HASH_DISTRIBUTED requires distribution keys // TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys List<Integer> distributionKeys = distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null; - - PlanNode mailboxSender = - new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId, + MailboxSendNode mailboxSendNode = + new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId, distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender()); - PlanNode mailboxReceiver = - new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId, - distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), - node.isSortOnReceiver(), mailboxSender); - mailboxSender.addInput(nextPlanFragmentRoot); - - context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId, - new PlanFragment(nextPlanFragmentId, mailboxSender, new PlanFragmentMetadata(), new ArrayList<>())); - if (!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) { - context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new ArrayList<>()); - } - context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId); - - return mailboxReceiver; + mailboxSendNode.addInput(nextPlanFragmentRoot); + context._planFragmentIdToRootNodeMap.put(senderPlanFragmentId, + new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>())); + + // Return the MailboxReceiveNode as the leave node of the current PlanFragment. + return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId, + distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(), + node.isSortOnReceiver(), mailboxSendNode); } private boolean isPlanFragmentSplitter(PlanNode node) { return ((ExchangeNode) node).getExchangeType() != PinotRelExchangeType.SUB_PLAN; } public static class Context { + final int _currentPlanFragmentId; + final AtomicInteger _nextPlanFragmentId; + final Map<Integer, PlanFragment> _planFragmentIdToRootNodeMap; + final Map<Integer, List<Integer>> _planFragmentIdToChildrenMap; + + public Context() { Review Comment: why do we need this public constructor? no one is using it right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org