morrySnow commented on code in PR #59006:
URL: https://github.com/apache/doris/pull/59006#discussion_r2645378761


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java:
##########
@@ -34,6 +34,14 @@ enum Qualifier {
         DISTINCT
     }
 
+    /**
+     * join shuffle type
+     */
+    enum ShuffleType {
+        shuffle,
+        bucketShuffle

Review Comment:
   ```suggestion
           SHUFFLE,
           BUCKET_SHUFFLE
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -649,19 +667,82 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+            int distributeToChildIndex = -1;
+            double basicBuckets = -1;
+            double basicRowCount = -1;
+
+            try {
+                ImmutableSet<ShuffleType> supportedShuffleTypes = 
ImmutableSet.of(
+                        ShuffleType.NATURAL,
+                        ShuffleType.STORAGE_BUCKETED
+                );
+                // find the most (bucket num, rowCount) side as the basic
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                    DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                    if (childDistribution instanceof DistributionSpecHash
+                            && supportedShuffleTypes.contains(
+                                    ((DistributionSpecHash) 
childDistribution).getShuffleType())) {
+                        int bucketNum = 
getBucketNum(children.get(i).getPlan());
+                        Statistics stats = setOperation.child(i).getStats();
+                        double rowCount = stats.getRowCount();

Review Comment:
   We select the child with the largest number of rows among all children whose 
bucket count is greater than the BE parallelism.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -649,19 +667,82 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+            int distributeToChildIndex = -1;
+            double basicBuckets = -1;
+            double basicRowCount = -1;
+
+            try {
+                ImmutableSet<ShuffleType> supportedShuffleTypes = 
ImmutableSet.of(
+                        ShuffleType.NATURAL,
+                        ShuffleType.STORAGE_BUCKETED
+                );
+                // find the most (bucket num, rowCount) side as the basic
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                    DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                    if (childDistribution instanceof DistributionSpecHash
+                            && supportedShuffleTypes.contains(
+                                    ((DistributionSpecHash) 
childDistribution).getShuffleType())) {

Review Comment:
   why check this? we should support all shuffle type in this branch



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -280,13 +280,13 @@ public Void 
visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
             addRequestPropertyToChildren(createHashRequestAccordingToParent(
                     setOperation, distributionSpecHash, context));
         } else {
-            // shuffle all column
-            // TODO: for wide table, may be we should add a upper limit of 
shuffle columns

Review Comment:
   why remove this TODO?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java:
##########
@@ -65,6 +65,12 @@ public abstract class SetOperationNode extends PlanNode {
 
     private boolean isColocate = false;
 
+    private DistributionMode distributionMode = DistributionMode.SHUFFLE;
+
+    public enum DistributionMode {

Review Comment:
   should reuse DistributionMode in HashJoinNode, move out DistributionMode 
from HashJoinNode



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java:
##########
@@ -177,6 +178,7 @@ public PlanFragment(PlanFragmentId id, PlanNode root, 
DataPartition partition) {
         this.builderRuntimeFilterIds = new HashSet<>();
         this.targetRuntimeFilterIds = new HashSet<>();
         this.hasBucketShuffleJoin = buildHasBucketShuffleJoin();
+        this.hasBucketShuffleSetOperation = 
buildHasBucketShuffleSetOperation();

Review Comment:
   could we merge them to hasBucketShuffleNode?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -649,19 +667,82 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+            int distributeToChildIndex = -1;
+            double basicBuckets = -1;
+            double basicRowCount = -1;
+
+            try {
+                ImmutableSet<ShuffleType> supportedShuffleTypes = 
ImmutableSet.of(
+                        ShuffleType.NATURAL,
+                        ShuffleType.STORAGE_BUCKETED
+                );
+                // find the most (bucket num, rowCount) side as the basic
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);

Review Comment:
   should check `children.get(i)` is not `PhysicalDistribute`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -649,19 +667,82 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+            int distributeToChildIndex = -1;
+            double basicBuckets = -1;
+            double basicRowCount = -1;
+
+            try {
+                ImmutableSet<ShuffleType> supportedShuffleTypes = 
ImmutableSet.of(
+                        ShuffleType.NATURAL,
+                        ShuffleType.STORAGE_BUCKETED
+                );
+                // find the most (bucket num, rowCount) side as the basic
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                    DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                    if (childDistribution instanceof DistributionSpecHash
+                            && supportedShuffleTypes.contains(
+                                    ((DistributionSpecHash) 
childDistribution).getShuffleType())) {
+                        int bucketNum = 
getBucketNum(children.get(i).getPlan());
+                        Statistics stats = setOperation.child(i).getStats();
+                        double rowCount = stats.getRowCount();
+                        if (bucketNum > basicBuckets) {
+                            basicBuckets = bucketNum;
+                            basicRowCount = rowCount;
+                            distributeToChildIndex = i;
+                        } else if (bucketNum == basicBuckets && rowCount > 
basicRowCount) {
+                            basicRowCount = rowCount;
+                            distributeToChildIndex = i;
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                // catch stats exception
+                LOG.warn("Can not find the most (bucket num, rowCount): " + t, 
t);
+                distributeToChildIndex = -1;
+            }
+
+            if (distributeToChildIndex >= 0) {
+                DistributionSpecHash notShuffleSideRequire
+                        = (DistributionSpecHash) 
requiredProperties.get(distributeToChildIndex).getDistributionSpec();
+
+                DistributionSpecHash notNeedShuffleOutput
+                        = (DistributionSpecHash) 
originChildrenProperties.get(distributeToChildIndex)
+                            .getDistributionSpec();
+
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    DistributionSpecHash current
+                            = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
+                    if (i == distributeToChildIndex) {
+                        continue;
+                    }
+
+                    DistributionSpecHash currentRequire
+                            = (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec();
+
                     PhysicalProperties target = calAnotherSideRequired(
-                            ShuffleType.EXECUTION_BUCKETED, basic, current,
-                            (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                            (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec());
+                            ShuffleType.STORAGE_BUCKETED,
+                            notNeedShuffleOutput, current,
+                            notShuffleSideRequire,
+                            currentRequire);
                     updateChildEnforceAndCost(i, target);
                 }
+                
setOperation.setMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, 
distributeToChildIndex);

Review Comment:
   why need this? please add comment to explain it



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2323,6 +2323,13 @@ && 
findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot()
             setOperationNode.setColocate(true);
         }
 
+        for (Plan child : setOperation.children()) {
+            PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
+            if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
+                
setOperationNode.setDistributionMode(SetOperationNode.DistributionMode.BUCKET_SHUFFLE);

Review Comment:
   should do break here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to