morningman commented on a change in pull request #4677: URL: https://github.com/apache/incubator-doris/pull/4677#discussion_r495541353
########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -390,6 +390,28 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ node.setColocate(false, reason.get(0)); } + // bucket shuffle join is better than boradcast and shuffle join Review comment: ```suggestion // bucket shuffle join is better than broadcast and shuffle join ``` ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, + List<Expr> rhsHashExprs) { + if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { + return false; + } + // If user have a join hint to use proper way of join, can not be colocate join Review comment: ```suggestion // If user have a join hint to use proper way of join, can not be bucket join ``` ########## File path: fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java ########## @@ -1399,6 +1429,175 @@ public boolean isDone() { } + class BucketShuffleJoinController { Review comment: Add comment for this class ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, + List<Expr> rhsHashExprs) { + if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { + return false; + } + // If user have a join hint to use proper way of join, can not be colocate join + if (node.getInnerRef().hasJoinHints()) { + return false; + } + + PlanNode leftRoot = leftChildFragment.getPlanRoot(); + //leftRoot should be ScanNode or HashJoinNode, rightRoot should be ScanNode Review comment: Comment is wrong ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, + List<Expr> rhsHashExprs) { + if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { + return false; + } + // If user have a join hint to use proper way of join, can not be colocate join Review comment: And I think if user specify [SHUFFLE] hint, we should try to do bucket shuffle too. ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -498,6 +520,72 @@ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragmen return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, + List<Expr> rhsHashExprs) { + if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { + return false; + } + // If user have a join hint to use proper way of join, can not be colocate join + if (node.getInnerRef().hasJoinHints()) { + return false; + } + + PlanNode leftRoot = leftChildFragment.getPlanRoot(); + //leftRoot should be ScanNode or HashJoinNode, rightRoot should be ScanNode + if (leftRoot instanceof OlapScanNode) { + return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + } + + return false; + } + + //the join expr must contian left table distribute column + private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, + List<Expr> rhsJoinExprs) { + OlapScanNode leftScanNode = ((OlapScanNode) leftRoot); + + //1 the left table must be only one partition + if (leftScanNode.getSelectedPartitionIds().size() > 1) { + return false; + } + + DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo(); + + if (leftDistribution instanceof HashDistributionInfo ) { + List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns(); + + List<Column> leftJoinColumns = new ArrayList<>(); + List<Expr> rightExprs = new ArrayList<>(); + List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts(); + + for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { + Expr lhsJoinExpr = eqJoinPredicate.getChild(0); + Expr rhsJoinExpr = eqJoinPredicate.getChild(1); + if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { + continue; + } + + SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc(); + + leftJoinColumns.add(leftSlot.getColumn()); + rightExprs.add(rhsJoinExpr); + } + + //2 the join columns should contains all left table distribute columns to enable bucket shuffle join + for (Column distributeColumn : leftDistributeColumns) { + int loc = leftJoinColumns.indexOf(distributeColumn); + // TODO: now support bucket shuffle join when distribute column type different with + // right expr type + if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) { + return false; + } + rhsJoinExprs.add(rightExprs.get(loc)); + } + } Review comment: ```suggestion } else { return false; } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org