gortiz commented on code in PR #13966: URL: https://github.com/apache/pinot/pull/13966#discussion_r1753454415
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java: ########## @@ -124,28 +128,41 @@ public Void visitFilter(FilterNode node, ServerPlanRequestContext context) { @Override public Void visitJoin(JoinNode node, ServerPlanRequestContext context) { - // visit only the static side, turn the dynamic side into a lookup from the pipeline breaker resultDataContainer - PlanNode staticSide = node.getInputs().get(0); - PlanNode dynamicSide = node.getInputs().get(1); - if (staticSide instanceof MailboxReceiveNode) { - dynamicSide = node.getInputs().get(0); - staticSide = node.getInputs().get(1); - } - if (visit(staticSide, context)) { - PipelineBreakerResult pipelineBreakerResult = context.getPipelineBreakerResult(); - int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide); - List<TransferableBlock> transferableBlocks = - pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList()); - List<Object[]> resultDataContainer = new ArrayList<>(); - DataSchema dataSchema = dynamicSide.getDataSchema(); - for (TransferableBlock block : transferableBlocks) { - if (block.getType() == DataBlock.Type.ROW) { - resultDataContainer.addAll(block.getContainer()); + // We can reach here for dynamic broadcast SEMI join and lookup join. + List<PlanNode> inputs = node.getInputs(); + PlanNode left = inputs.get(0); + PlanNode right = inputs.get(1); + + if (right instanceof MailboxReceiveNode + && ((MailboxReceiveNode) right).getExchangeType() == PinotRelExchangeType.PIPELINE_BREAKER) { Review Comment: Short answer: Yes. Long answer: Calcite itself expects that. The root idea in Calcite is that rules should optimize logical rules (ie pushing filters into joins), apply distribution, etc and then some final rules will transform logical rules (ie LogicalJoin) into specific joins. Calcite for example includes Enumerable operators that implement most logical operators. For example, EnumerableMergeJoin implements a nested loop join while EnumerableHashJoin implements a hash join. EnumerableJoinRule can be used to decide which one should be used. We wouldn't use EnumerableJoinRule, instead we should have our own rule that decides whether to use hash join, semi join, lookup join... but even more advanced joins like ones that merge a join+limit or join+aggregate. For example imagine a tree like: ``` Aggregate (count by A.col1) Join select A select B ``` Right now Join emits (and allocate) a lot of rows just to be aggregated by its parent. It would be more efficient to count at the same time we build the blocks. Obviously we are not going to apply this kind of optimizations in the short term, but in the medium/large will be very effective and at the same time would be very error prone to repeat the logic that creates these optimizations in both Calcite (to prioritize the plans that can be optimized) and then in ServerPlanRequestVisitor. Instead the _Calcite's way_ should be to generate the AggregateJoinRel node and then we should be able to blindly generate the executable Pinot Operator whenever a AggregateJoinRel is received, without having to check conditions again (because we assume a friendly Broker that doesn't generate incorrect plans). About serialization: Due to our own decisions we decided to add an extra layer of JoinNodes (which I don't think they are necessary) and a layer of GRPC (which makes more sense, but we could just remove the JoinNode layer and transform Calcite operators directly into GRPC). We could also use the JSON representation of Calcite, but AFAIR we decided to use GRPC to do not depend on Calcite breaking backward compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org