gortiz commented on code in PR #13966:
URL: https://github.com/apache/pinot/pull/13966#discussion_r1753454415


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java:
##########
@@ -124,28 +128,41 @@ public Void visitFilter(FilterNode node, 
ServerPlanRequestContext context) {
 
   @Override
   public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
-    // visit only the static side, turn the dynamic side into a lookup from 
the pipeline breaker resultDataContainer
-    PlanNode staticSide = node.getInputs().get(0);
-    PlanNode dynamicSide = node.getInputs().get(1);
-    if (staticSide instanceof MailboxReceiveNode) {
-      dynamicSide = node.getInputs().get(0);
-      staticSide = node.getInputs().get(1);
-    }
-    if (visit(staticSide, context)) {
-      PipelineBreakerResult pipelineBreakerResult = 
context.getPipelineBreakerResult();
-      int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
-      List<TransferableBlock> transferableBlocks =
-          pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, 
Collections.emptyList());
-      List<Object[]> resultDataContainer = new ArrayList<>();
-      DataSchema dataSchema = dynamicSide.getDataSchema();
-      for (TransferableBlock block : transferableBlocks) {
-        if (block.getType() == DataBlock.Type.ROW) {
-          resultDataContainer.addAll(block.getContainer());
+    // We can reach here for dynamic broadcast SEMI join and lookup join.
+    List<PlanNode> inputs = node.getInputs();
+    PlanNode left = inputs.get(0);
+    PlanNode right = inputs.get(1);
+
+    if (right instanceof MailboxReceiveNode
+        && ((MailboxReceiveNode) right).getExchangeType() == 
PinotRelExchangeType.PIPELINE_BREAKER) {

Review Comment:
   Short answer: Yes.
   
   Long answer:
   
   Calcite itself expects that. The root idea in Calcite is that rules should 
optimize logical rules (ie pushing filters into joins), apply distribution, etc 
and then some final rules will transform logical rules (ie LogicalJoin) into 
specific joins. Calcite for example includes Enumerable operators that 
implement most logical operators. For example, EnumerableMergeJoin implements a 
nested loop join while EnumerableHashJoin implements a hash join. 
EnumerableJoinRule can be used to decide which one should be used. We wouldn't 
use EnumerableJoinRule, instead we should have our own rule that decides 
whether to use hash join, semi join, lookup join... but even more advanced 
joins like ones that merge a join+limit or join+aggregate.
   
   For example imagine a tree like:
   ```
   Aggregate (count by A.col1)
     Join
       select A
       select B
   ```
   Right now Join emits (and allocate) a lot of rows just to be aggregated by 
its parent. It would be more efficient to count at the same time we build the 
blocks. Obviously we are not going to apply this kind of optimizations in the 
short term, but in the medium/large will be very effective and at the same time 
would be very error prone to repeat the logic that creates these optimizations 
in both Calcite (to prioritize the plans that can be optimized) and then in 
ServerPlanRequestVisitor.
   
   Instead the _Calcite's way_ should be to generate the AggregateJoinRel node 
and then we should be able to blindly generate the executable Pinot Operator 
whenever a AggregateJoinRel is received, without having to check conditions 
again (because we assume a friendly Broker that doesn't generate incorrect 
plans). 
   
   About serialization:
   
   Due to our own decisions we decided to add an extra layer of JoinNodes 
(which I don't think they are necessary) and a layer of GRPC (which makes more 
sense, but we could just remove the JoinNode layer and transform Calcite 
operators directly into GRPC). We could also use the JSON representation of 
Calcite, but AFAIR we decided to use GRPC to do not depend on Calcite breaking 
backward compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to