ankitsultana commented on code in PR #15943:
URL: https://github.com/apache/pinot/pull/15943#discussion_r2116814201


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java:
##########
@@ -48,112 +46,122 @@
 import org.apache.pinot.query.routing.SharedMailboxInfos;
 
 
+/**
+ * <h1>Responsibilities</h1>
+ * This does the following:
+ * <ul>
+ *   <li>Splits plan around PhysicalExchange nodes to create plan 
fragments.</li>
+ *   <li>Converts PRelNodes to PlanNodes.</li>
+ *   <li>
+ *     Creates mailboxes for connecting plan fragments. This is done simply 
based on the workers in the send/receive
+ *     plan nodes, and the exchange strategy (identity, partitioning, etc.).
+ *   </li>
+ *   <li>
+ *     Creates metadata for each plan fragment, which includes the scanned 
tables, unavailable segments, etc.
+ *   </li>
+ * </ul>
+ * <h1>Design Note</h1>
+ * This class is completely un-opinionated. The old optimizer had a lot of 
custom logic added to mailbox assignment,
+ * but this class instead doesn't do any special handling, apart from 
assigning mailboxes based on the exchange
+ * strategy. This is an important and conscious design choice, because it 
ensures division of responsibilities and
+ * allows optimizer rules like worker assignment to completely own their 
responsibilities. This is also important for
+ * keeping the optimizer maximally pluggable. (e.g. you can swap out the 
default worker assignment rule with a
+ * custom rule like the LiteMode worker assignment rule).
+ */
 public class PlanFragmentAndMailboxAssignment {
   private static final int ROOT_FRAGMENT_ID = 0;
-  private static final int FIRST_NON_ROOT_FRAGMENT_ID = 1;
 
   public Result compute(PRelNode rootPRelNode, PhysicalPlannerContext 
physicalPlannerContext) {
-    Preconditions.checkState(!(rootPRelNode.unwrap() instanceof Exchange), 
"root node should never be exchange");
-    final DataSchema rootDataSchema = 
PRelToPlanNodeConverter.toDataSchema(rootPRelNode.unwrap().getRowType());
-    // Create input fragment's send node.
-    MailboxSendNode sendNode = new MailboxSendNode(FIRST_NON_ROOT_FRAGMENT_ID, 
rootDataSchema, new ArrayList<>(),
-        ROOT_FRAGMENT_ID, PinotRelExchangeType.getDefaultExchangeType(), 
RelDistribution.Type.SINGLETON,
-        null, false, null, false);
-    // Create root receive node.
-    MailboxReceiveNode rootReceiveNode = new 
MailboxReceiveNode(ROOT_FRAGMENT_ID, rootDataSchema,
-        FIRST_NON_ROOT_FRAGMENT_ID, 
PinotRelExchangeType.getDefaultExchangeType(),
-        RelDistribution.Type.BROADCAST_DISTRIBUTED, null, null, false, false, 
sendNode);
     // Create the first two fragments.
     Context context = new Context(physicalPlannerContext);
-    PlanFragment rootFragment = createFragment(ROOT_FRAGMENT_ID, 
rootReceiveNode, new ArrayList<>(), context);
-    PlanFragment firstInputFragment = 
createFragment(FIRST_NON_ROOT_FRAGMENT_ID, sendNode, new ArrayList<>(), 
context);
-    rootFragment.getChildren().add(firstInputFragment);
-    QueryServerInstance brokerInstance = new 
QueryServerInstance(physicalPlannerContext.getInstanceId(),
-        physicalPlannerContext.getHostName(), 
physicalPlannerContext.getPort(), physicalPlannerContext.getPort());
-    computeMailboxInfos(FIRST_NON_ROOT_FRAGMENT_ID, ROOT_FRAGMENT_ID,
-        
createWorkerMap(rootPRelNode.getPinotDataDistributionOrThrow().getWorkers(), 
context),
-        ImmutableMap.of(0, brokerInstance), 
ExchangeStrategy.SINGLETON_EXCHANGE, context);
     // Traverse entire tree.
-    
context._fragmentMetadataMap.get(ROOT_FRAGMENT_ID).setWorkerIdToServerInstanceMap(ImmutableMap.of(
-        0, brokerInstance));
-    visit(rootPRelNode, sendNode, firstInputFragment, context);
+    process(rootPRelNode, null, ROOT_FRAGMENT_ID, context);
     Result result = new Result();
     result._fragmentMetadataMap = context._fragmentMetadataMap;
     result._planFragmentMap = context._planFragmentMap;
     return result;
   }
 
-  /**
-   * Invariants: 1. Parent PlanNode does not have current node in input yet. 
2. This node is NOT the fragment root. This
-   * is because each fragment root is a MailboxSendNode.
-   */
-  private void visit(PRelNode pRelNode, @Nullable PlanNode parent, 
PlanFragment currentFragment, Context context) {
-    int currentFragmentId = currentFragment.getFragmentId();
-    DispatchablePlanMetadata fragmentMetadata = 
context._fragmentMetadataMap.get(currentFragmentId);
-    if (MapUtils.isEmpty(fragmentMetadata.getWorkerIdToServerInstanceMap())) {
-      // TODO: This is quite a complex invariant.
-      fragmentMetadata.setWorkerIdToServerInstanceMap(createWorkerMap(
-          pRelNode.getPinotDataDistributionOrThrow().getWorkers(), context));
-    }
+  private void process(PRelNode pRelNode, @Nullable PlanNode parent, int 
currentFragmentId, Context context) {
     if (pRelNode.unwrap() instanceof TableScan) {
-      TableScanMetadata tableScanMetadata = 
Objects.requireNonNull(pRelNode.getTableScanMetadata(),
-          "No metadata in table scan PRelNode");
-      String tableName = 
tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow();
-      if (!tableScanMetadata.getUnavailableSegmentsMap().isEmpty()) {
-        fragmentMetadata.addUnavailableSegments(tableName,
-            tableScanMetadata.getUnavailableSegmentsMap().get(tableName));
-      }
-      fragmentMetadata.addScannedTable(tableName);
-      
fragmentMetadata.setWorkerIdToSegmentsMap(tableScanMetadata.getWorkedIdToSegmentsMap());
-      NodeHint nodeHint = NodeHint.fromRelHints(((TableScan) 
pRelNode.unwrap()).getHints());
-      
fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
-      if (tableScanMetadata.getTimeBoundaryInfo() != null) {
-        
fragmentMetadata.setTimeBoundaryInfo(tableScanMetadata.getTimeBoundaryInfo());
-      }
+      processTableScan((PhysicalTableScan) pRelNode.unwrap(), 
currentFragmentId, context);
     }
     if (pRelNode.unwrap() instanceof PhysicalExchange) {
+      // Split an exchange into two fragments: one for the sender and one for 
the receiver.
+      // The sender fragment will have a MailboxSendNode and receiver a 
MailboxReceiveNode.
+      // It is possible that the receiver fragment doesn't exist yet (e.g. 
when PhysicalExchange is the root node).
+      // In that case, we also create it here. If it exists already, we simply 
re-use it.
       PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap();
-      int senderFragmentId = context._planFragmentMap.size();
+      PlanFragment receiverFragment = 
context._planFragmentMap.get(currentFragmentId);
+      int senderFragmentId = context._planFragmentMap.size() + 
(receiverFragment == null ? 1 : 0);
       final DataSchema inputFragmentSchema = 
PRelToPlanNodeConverter.toDataSchema(
           pRelNode.getPRelInput(0).unwrap().getRowType());
       RelDistribution.Type distributionType = 
ExchangeStrategy.getRelDistribution(
           physicalExchange.getExchangeStrategy(), 
physicalExchange.getDistributionKeys()).getType();
-      List<PlanNode> inputs = new ArrayList<>();
-      MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, 
inputFragmentSchema, inputs, currentFragmentId,
-          PinotRelExchangeType.getDefaultExchangeType(), distributionType, 
physicalExchange.getDistributionKeys(),
-          false, physicalExchange.getRelCollation().getFieldCollations(), 
false /* todo: set sortOnSender */);
+      MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, 
inputFragmentSchema, new ArrayList<>(),
+          currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(), 
distributionType,
+          physicalExchange.getDistributionKeys(), false, 
physicalExchange.getRelCollation().getFieldCollations(),
+          false /* todo: set sortOnSender */);
       MailboxReceiveNode receiveNode = new 
MailboxReceiveNode(currentFragmentId, inputFragmentSchema,
           senderFragmentId, PinotRelExchangeType.getDefaultExchangeType(), 
distributionType,
           physicalExchange.getDistributionKeys(), 
physicalExchange.getRelCollation().getFieldCollations(),
           false /* TODO: set sort on receiver */, false /* TODO: set sort on 
sender */, sendNode);
-      PlanFragment newPlanFragment = createFragment(senderFragmentId, 
sendNode, new ArrayList<>(), context);
+      if (receiverFragment == null) {
+        receiverFragment = createFragment(currentFragmentId, receiveNode, new 
ArrayList<>(), context,

Review Comment:
   Found a bug. This sets the root of the fragment to receiveNode, but instead 
it should be the top-most node of the PRelNode tree. Will update soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to