ankitsultana commented on code in PR #15439: URL: https://github.com/apache/pinot/pull/15439#discussion_r2027989547
########## pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java: ########## @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.traits; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.Window; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions; +import org.apache.pinot.calcite.rel.rules.PinotRuleUtils; +import org.apache.pinot.query.context.PhysicalPlannerContext; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalWindow; + + +/** + * Assign trait constraints to the plan. The Physical Planner should ensure that these constraints are met by + * inserting Exchange wherever required. This operates with Physical RelNodes because Calcite emits Logical RelNodes, + * many of which drop traits on copy. + */ +public class TraitAssignment { + private final Supplier<Integer> _planIdGenerator; + + public TraitAssignment(Supplier<Integer> planIdGenerator) { + _planIdGenerator = planIdGenerator; + } + + public static RelNode assign(RelNode relNode, PhysicalPlannerContext physicalPlannerContext) { + TraitAssignment traitAssignment = new TraitAssignment(physicalPlannerContext.getNodeIdGenerator()); + return traitAssignment.assign(relNode); + } + + public RelNode assign(RelNode node) { + // Process inputs first. + List<RelNode> newInputs = new ArrayList<>(); + for (RelNode input : node.getInputs()) { + newInputs.add(assign(input)); + } + node = node.copy(node.getTraitSet(), newInputs); + // Process current node. + if (node instanceof PhysicalSort) { + return assignSort((PhysicalSort) node); + } else if (node instanceof PhysicalJoin) { + return assignJoin((PhysicalJoin) node); + } else if (node instanceof PhysicalAggregate) { + return assignAggregate((PhysicalAggregate) node); + } else if (node instanceof PhysicalWindow) { + return assignWindow((PhysicalWindow) node); + } + return node; + } + + /** + * Sort is always computed by coalescing to a single stream. Hence, we add a SINGLETON trait to the sort input. + */ + @VisibleForTesting + RelNode assignSort(PhysicalSort sort) { + RelNode input = sort.getInput(); + RelTraitSet newTraitSet = input.getTraitSet().plus(RelDistributions.SINGLETON); + input = input.copy(newTraitSet, input.getInputs()); + return sort.copy(sort.getTraitSet(), ImmutableList.of(input)); + } + + /** + * Handles lookup and dynamic filter for semi-join case separately. + * <p> + * TODO(mse-physical): Support colocated join hint. See + * <a href="https://github.com/apache/pinot/issues/15455">F2</a>). + * <br /> + * TODO(mse-physical): Instead of random exchange on the left, we should simply skip exchange. + * See <a href="https://github.com/apache/pinot/issues/15455">F3</a>. + * </p> + */ + @VisibleForTesting + RelNode assignJoin(PhysicalJoin join) { + // Case-1: Handle lookup joins. + if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { + return assignLookupJoin(join); + } + // Case-2: Handle dynamic filter for semi joins. + JoinInfo joinInfo = join.analyzeCondition(); + if (join.isSemiJoin() && joinInfo.nonEquiConditions.isEmpty() && joinInfo.leftKeys.size() == 1) { + if (PinotRuleUtils.canPushDynamicBroadcastToLeaf(join.getLeft())) { + return assignDynamicFilterSemiJoin(join); + } + } + // Case-3: Default case. + RelDistribution leftDistribution = joinInfo.leftKeys.isEmpty() ? RelDistributions.RANDOM_DISTRIBUTED + : RelDistributions.hash(joinInfo.leftKeys); + RelDistribution rightDistribution = joinInfo.rightKeys.isEmpty() ? RelDistributions.BROADCAST_DISTRIBUTED + : RelDistributions.hash(joinInfo.rightKeys); + // left-input + RelNode leftInput = join.getInput(0); + RelTraitSet leftTraitSet = leftInput.getTraitSet().plus(leftDistribution); + leftInput = leftInput.copy(leftTraitSet, leftInput.getInputs()); + // right-input + RelNode rightInput = join.getInput(1); + RelTraitSet rightTraitSet = rightInput.getTraitSet().plus(rightDistribution); + rightInput = rightInput.copy(rightTraitSet, rightInput.getInputs()); + return join.copy(join.getTraitSet(), ImmutableList.of(leftInput, rightInput)); + } + + /** + * When group-by keys are empty, we can use SINGLETON distribution. Otherwise, we use hash distribution on the + * group-by keys. + */ + @VisibleForTesting Review Comment: Yeah I had started writing them but have stashed them for now. I want to get closer to the E2E working state before getting into unit-testing since I might have to move things around quite a bit. I am tracking UT for Trait Assigment as a todo here: #15456. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org