ankitsultana commented on code in PR #15943: URL: https://github.com/apache/pinot/pull/15943#discussion_r2116814201
########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java: ########## @@ -48,112 +46,122 @@ import org.apache.pinot.query.routing.SharedMailboxInfos; +/** + * <h1>Responsibilities</h1> + * This does the following: + * <ul> + * <li>Splits plan around PhysicalExchange nodes to create plan fragments.</li> + * <li>Converts PRelNodes to PlanNodes.</li> + * <li> + * Creates mailboxes for connecting plan fragments. This is done simply based on the workers in the send/receive + * plan nodes, and the exchange strategy (identity, partitioning, etc.). + * </li> + * <li> + * Creates metadata for each plan fragment, which includes the scanned tables, unavailable segments, etc. + * </li> + * </ul> + * <h1>Design Note</h1> + * This class is completely un-opinionated. The old optimizer had a lot of custom logic added to mailbox assignment, + * but this class instead doesn't do any special handling, apart from assigning mailboxes based on the exchange + * strategy. This is an important and conscious design choice, because it ensures division of responsibilities and + * allows optimizer rules like worker assignment to completely own their responsibilities. This is also important for + * keeping the optimizer maximally pluggable. (e.g. you can swap out the default worker assignment rule with a + * custom rule like the LiteMode worker assignment rule). + */ public class PlanFragmentAndMailboxAssignment { private static final int ROOT_FRAGMENT_ID = 0; - private static final int FIRST_NON_ROOT_FRAGMENT_ID = 1; public Result compute(PRelNode rootPRelNode, PhysicalPlannerContext physicalPlannerContext) { - Preconditions.checkState(!(rootPRelNode.unwrap() instanceof Exchange), "root node should never be exchange"); - final DataSchema rootDataSchema = PRelToPlanNodeConverter.toDataSchema(rootPRelNode.unwrap().getRowType()); - // Create input fragment's send node. - MailboxSendNode sendNode = new MailboxSendNode(FIRST_NON_ROOT_FRAGMENT_ID, rootDataSchema, new ArrayList<>(), - ROOT_FRAGMENT_ID, PinotRelExchangeType.getDefaultExchangeType(), RelDistribution.Type.SINGLETON, - null, false, null, false); - // Create root receive node. - MailboxReceiveNode rootReceiveNode = new MailboxReceiveNode(ROOT_FRAGMENT_ID, rootDataSchema, - FIRST_NON_ROOT_FRAGMENT_ID, PinotRelExchangeType.getDefaultExchangeType(), - RelDistribution.Type.BROADCAST_DISTRIBUTED, null, null, false, false, sendNode); // Create the first two fragments. Context context = new Context(physicalPlannerContext); - PlanFragment rootFragment = createFragment(ROOT_FRAGMENT_ID, rootReceiveNode, new ArrayList<>(), context); - PlanFragment firstInputFragment = createFragment(FIRST_NON_ROOT_FRAGMENT_ID, sendNode, new ArrayList<>(), context); - rootFragment.getChildren().add(firstInputFragment); - QueryServerInstance brokerInstance = new QueryServerInstance(physicalPlannerContext.getInstanceId(), - physicalPlannerContext.getHostName(), physicalPlannerContext.getPort(), physicalPlannerContext.getPort()); - computeMailboxInfos(FIRST_NON_ROOT_FRAGMENT_ID, ROOT_FRAGMENT_ID, - createWorkerMap(rootPRelNode.getPinotDataDistributionOrThrow().getWorkers(), context), - ImmutableMap.of(0, brokerInstance), ExchangeStrategy.SINGLETON_EXCHANGE, context); // Traverse entire tree. - context._fragmentMetadataMap.get(ROOT_FRAGMENT_ID).setWorkerIdToServerInstanceMap(ImmutableMap.of( - 0, brokerInstance)); - visit(rootPRelNode, sendNode, firstInputFragment, context); + process(rootPRelNode, null, ROOT_FRAGMENT_ID, context); Result result = new Result(); result._fragmentMetadataMap = context._fragmentMetadataMap; result._planFragmentMap = context._planFragmentMap; return result; } - /** - * Invariants: 1. Parent PlanNode does not have current node in input yet. 2. This node is NOT the fragment root. This - * is because each fragment root is a MailboxSendNode. - */ - private void visit(PRelNode pRelNode, @Nullable PlanNode parent, PlanFragment currentFragment, Context context) { - int currentFragmentId = currentFragment.getFragmentId(); - DispatchablePlanMetadata fragmentMetadata = context._fragmentMetadataMap.get(currentFragmentId); - if (MapUtils.isEmpty(fragmentMetadata.getWorkerIdToServerInstanceMap())) { - // TODO: This is quite a complex invariant. - fragmentMetadata.setWorkerIdToServerInstanceMap(createWorkerMap( - pRelNode.getPinotDataDistributionOrThrow().getWorkers(), context)); - } + private void process(PRelNode pRelNode, @Nullable PlanNode parent, int currentFragmentId, Context context) { if (pRelNode.unwrap() instanceof TableScan) { - TableScanMetadata tableScanMetadata = Objects.requireNonNull(pRelNode.getTableScanMetadata(), - "No metadata in table scan PRelNode"); - String tableName = tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow(); - if (!tableScanMetadata.getUnavailableSegmentsMap().isEmpty()) { - fragmentMetadata.addUnavailableSegments(tableName, - tableScanMetadata.getUnavailableSegmentsMap().get(tableName)); - } - fragmentMetadata.addScannedTable(tableName); - fragmentMetadata.setWorkerIdToSegmentsMap(tableScanMetadata.getWorkedIdToSegmentsMap()); - NodeHint nodeHint = NodeHint.fromRelHints(((TableScan) pRelNode.unwrap()).getHints()); - fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS)); - if (tableScanMetadata.getTimeBoundaryInfo() != null) { - fragmentMetadata.setTimeBoundaryInfo(tableScanMetadata.getTimeBoundaryInfo()); - } + processTableScan((PhysicalTableScan) pRelNode.unwrap(), currentFragmentId, context); } if (pRelNode.unwrap() instanceof PhysicalExchange) { + // Split an exchange into two fragments: one for the sender and one for the receiver. + // The sender fragment will have a MailboxSendNode and receiver a MailboxReceiveNode. + // It is possible that the receiver fragment doesn't exist yet (e.g. when PhysicalExchange is the root node). + // In that case, we also create it here. If it exists already, we simply re-use it. PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap(); - int senderFragmentId = context._planFragmentMap.size(); + PlanFragment receiverFragment = context._planFragmentMap.get(currentFragmentId); + int senderFragmentId = context._planFragmentMap.size() + (receiverFragment == null ? 1 : 0); final DataSchema inputFragmentSchema = PRelToPlanNodeConverter.toDataSchema( pRelNode.getPRelInput(0).unwrap().getRowType()); RelDistribution.Type distributionType = ExchangeStrategy.getRelDistribution( physicalExchange.getExchangeStrategy(), physicalExchange.getDistributionKeys()).getType(); - List<PlanNode> inputs = new ArrayList<>(); - MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, inputFragmentSchema, inputs, currentFragmentId, - PinotRelExchangeType.getDefaultExchangeType(), distributionType, physicalExchange.getDistributionKeys(), - false, physicalExchange.getRelCollation().getFieldCollations(), false /* todo: set sortOnSender */); + MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, inputFragmentSchema, new ArrayList<>(), + currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(), distributionType, + physicalExchange.getDistributionKeys(), false, physicalExchange.getRelCollation().getFieldCollations(), + false /* todo: set sortOnSender */); MailboxReceiveNode receiveNode = new MailboxReceiveNode(currentFragmentId, inputFragmentSchema, senderFragmentId, PinotRelExchangeType.getDefaultExchangeType(), distributionType, physicalExchange.getDistributionKeys(), physicalExchange.getRelCollation().getFieldCollations(), false /* TODO: set sort on receiver */, false /* TODO: set sort on sender */, sendNode); - PlanFragment newPlanFragment = createFragment(senderFragmentId, sendNode, new ArrayList<>(), context); + if (receiverFragment == null) { + receiverFragment = createFragment(currentFragmentId, receiveNode, new ArrayList<>(), context, Review Comment: Found a bug. This sets the root of the fragment to receiveNode, but instead it should be the top-most node of the PRelNode tree. Will update soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org