walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198077928


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.PinotHintStrategyTable;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.zookeeper.common.StringUtils;
+
+
+/**
+ * Special rule for Pinot, this rule transposing an INNER JOIN into dynamic 
broadcast join for the leaf stage.
+ *
+ * <p>Consider the following INNER JOIN plan
+ *
+ *                  ...                                     ...
+ *                   |                                       |
+ *             [ Transform ]                           [ Transform ]
+ *                   |                                       |
+ *             [ Inner Join ]                     [ Pass-through xChange ]
+ *             /            \                          /
+ *        [xChange]      [xChange]              [Inner Join] <----
+ *           /                \                      /            \
+ *     [ Transform ]     [ Transform ]         [ Transform ]       \
+ *          |                  |                    |               \
+ *     [Proj/Filter]     [Proj/Filter]         [Proj/Filter]  <------\
+ *          |                  |                    |                 \
+ *     [Table Scan ]     [Table Scan ]         [Table Scan ]       [xChange]
+ *                                                                      \
+ *                                                                 [ Transform 
]
+ *                                                                       |
+ *                                                                 
[Proj/Filter]
+ *                                                                       |
+ *                                                                 [Table Scan 
]
+ *
+ * <p> This rule is one part of the overall mechanism and this rule only does 
the following
+ *
+ *                 ...                                      ...
+ *                  |                                        |
+ *            [ Transform ]                             [ Transform ]
+ *                  |                                       /
+ *            [ Inner Join ]                     [Pass-through xChange]
+ *            /            \                            /
+ *       [xChange]      [xChange]          |----[Dyn. Broadcast]   <-----
+ *          /                \             |           |                 \
+ *    [ Transform ]     [ Transform ]      |----> [ Inner Join ]      [xChange]
+ *         |                  |            |           |                   \
+ *    [Proj/Filter]     [Proj/Filter]      |      [ Transform ]       [ 
Transform ]
+ *         |                  |            |          |                    |
+ *    [Table Scan ]     [Table Scan ]      |----> [Proj/Filter]       
[Proj/Filter]
+ *                                                     |                    |
+ *                                                [Table Scan ]       [Table 
Scan ]
+ *
+ *
+ *
+ * <p> The next part to extend the Dynamic broadcast into the Proj/Filter 
operator happens in the runtime.
+ *
+ * <p> This rewrite is only useful if we can ensure that:
+ * <ul>
+ *   <li>new plan can leverage the indexes and fast filtering of the 
leaf-stage Pinot server processing logic.</li>
+ *   <li>
+ *     data sending over xChange should be relatively small comparing to data 
would've been selected out if the dynamic
+ *     broadcast is not applied.
+ *   </li>
+ *   <li>
+ *     since leaf-stage operator can only process a typical Pinot V1 engine 
chain-operator chain. This means the entire
+ *     right-table will be ship over and persist in memory before the dynamic 
broadcast can occur. memory foot print
+ *     will be high so this rule should be use carefully until we have 
cost-based optimization.
+ *   </li>
+ *   <li>
+ *     if the dynamic broadcast stage is broadcasting to a left-table that's 
pre-partitioned with the join key, then the
+ *     right-table will be ship over and persist in memory using hash 
distribution. memory foot print will be
+ *     relatively low comparing to non-partitioned.
+ *   </li>
+ *   <li>
+ *     if the dynamic broadcast stage operating on a table with the same 
partition scheme as the left-table, then there
+ *     is not going to be any network shuffling overhead. both memory and 
latency overhead will be low.
+ *   </li>
+ * </ul>
+ *
+ * TODO #1: Only support SEMI-JOIN, once JOIN operator is supported by 
leaf-stage we should allow it to match
+ *   @see <a href="https://github.com/apache/pinot/pull/10565/>
+ * TODO #2: Only convert to dynamic broadcast from right-to-left, allow option 
to specify dynamic broadcast direction.
+ * TODO #3: Only convert to dynamic broadcast if left is leaf stage, allow the 
option for intermediate stage.
+ * TODO #4: currently adding a pass-through after join, support leaf-stage to 
chain arbitrary operator(s) next.
+ */
+public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
+  public static final PinotJoinToDynamicBroadcastRule INSTANCE =
+      new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
+  private static final String DYNAMIC_BROADCAST_HINT_OPTION_VALUE = 
"dynamic_broadcast";
+
+  public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
+    super(operand(LogicalJoin.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
+      return false;
+    }
+    Join join = call.rel(0);
+    String joinStrategyString = 
PinotHintStrategyTable.getHintOption(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, 
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
+    List<String> joinStrategies = joinStrategyString != null ? 
StringUtils.split(joinStrategyString, ",")
+        : Collections.emptyList();
+    if (!joinStrategies.contains(DYNAMIC_BROADCAST_HINT_OPTION_VALUE)) {
+      return false;
+    }
+    JoinInfo joinInfo = join.analyzeCondition();
+    RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) 
join.getLeft()).getCurrentRel()
+        : join.getLeft();
+    RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) 
join.getRight()).getCurrentRel()
+        : join.getRight();
+    return left instanceof LogicalExchange && right instanceof LogicalExchange
+        && PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
+        && (join.getJoinType() == JoinRelType.SEMI && 
joinInfo.nonEquiConditions.isEmpty());
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Join join = call.rel(0);
+    LogicalExchange left = (LogicalExchange) (join.getLeft() instanceof 
HepRelVertex
+        ? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
+    LogicalExchange right = (LogicalExchange) (join.getRight() instanceof 
HepRelVertex
+        ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
+
+    // TODO: both optimized hash empty list should be singleton exchange. 
however it is broken. fix later.
+    boolean isColocatedJoin = 
PinotHintStrategyTable.containsHintOption(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, 
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
+    LogicalExchange dynamicBroadcastExchange = isColocatedJoin
+        ? LogicalExchange.create(right.getInput(), 
RelDistributions.hash(Collections.emptyList()))

Review Comment:
   TODO: we have to move this to hash with empty List (deterministically to 
1-server) instead of singleton. we need to clean up singleton before this can 
work. CC @xiangfu0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to