github-actions[bot] commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3523096872


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -651,83 +652,109 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            // TODO: open comment when support `enable_local_shuffle_planner`
-            // int bucketShuffleBasicIndex = -1;
-            // double basicRowCount = -1;
-
-            // find the bucket shuffle basic index
-            // try {
-            //     ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
-            //             ShuffleType.NATURAL,
-            //             ShuffleType.STORAGE_BUCKETED
-            //     );
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
-            //         DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
-            //         if (childDistribution instanceof DistributionSpecHash
-            //                 && supportBucketShuffleTypes.contains(
-            //                         ((DistributionSpecHash) 
childDistribution).getShuffleType())
-            //                 && 
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
-            //             Statistics stats = setOperation.child(i).getStats();
-            //             double rowCount = stats.getRowCount();
-            //             if (rowCount > basicRowCount) {
-            //                 basicRowCount = rowCount;
-            //                 bucketShuffleBasicIndex = i;
-            //             }
-            //         }
-            //     }
-            // } catch (Throwable t) {
-            //     // catch stats exception
-            //     LOG.warn("Can not find the most (bucket num, rowCount): " + 
t, t);
-            //     bucketShuffleBasicIndex = -1;
-            // }
-
-            // use bucket shuffle
-            // if (bucketShuffleBasicIndex >= 0) {
-            //     DistributionSpecHash notShuffleSideRequire
-            //             = (DistributionSpecHash) 
requiredProperties.get(bucketShuffleBasicIndex)
-            //                   .getDistributionSpec();
-            //
-            //     DistributionSpecHash notNeedShuffleOutput
-            //             = (DistributionSpecHash) 
originChildrenProperties.get(bucketShuffleBasicIndex)
-            //                 .getDistributionSpec();
-            //
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         DistributionSpecHash current
-            //                 = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-            //         if (i == bucketShuffleBasicIndex) {
-            //             continue;
-            //         }
-            //
-            //         DistributionSpecHash currentRequire
-            //                 = (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec();
-            //
-            //         PhysicalProperties target = calAnotherSideRequired(
-            //                 ShuffleType.STORAGE_BUCKETED,
-            //                 notNeedShuffleOutput, current,
-            //                 notShuffleSideRequire,
-            //                 currentRequire);
-            //         updateChildEnforceAndCost(i, target);
-            //     }
-            //     setOperation.setMutableState(
-            //         PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, 
bucketShuffleBasicIndex);
-            // use partitioned shuffle
-            // } else {
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
-                    PhysicalProperties target = calAnotherSideRequired(
-                            ShuffleType.EXECUTION_BUCKETED, basic, current,
-                            (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                            (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec());
+            int bucketShuffleBasicIndex = -1;
+            double basicRowCount = -1;
+
+            // Bucket shuffle for set operation is only valid when the FE 
plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results.
+            // It also requires the nereids distribute planner: the legacy 
coordinator only
+            // supports bucket-shuffle-partitioned sinks whose dest fragment 
contains a bucket
+            // shuffle join, so a bucket-shuffle set operation fragment cannot 
be scheduled there.
+            // Otherwise, keep bucketShuffleBasicIndex = -1 and fall back to 
the
+            // execution-bucketed (partitioned) shuffle below.
+            ConnectContext setOperationContext = ConnectContext.get();
+            boolean enableLocalShufflePlanner = setOperationContext != null
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShuffle()
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner()
+                    && 
SessionVariable.canUseNereidsDistributePlanner(setOperationContext);
+
+            // find the bucket shuffle basic index: the largest natural / 
storage-bucketed child
+            // keeps its bucket distribution, every other child is 
bucket-shuffled to it.
+            // isBucketShuffleDownGrade reuses the join-side heuristics on 
purpose, including
+            // the enable_bucket_shuffle_join switch and 
bucket_shuffle_downgrade_ratio: bucket
+            // shuffle for set operation belongs to the same optimization 
family as bucket
+            // shuffle join, so the join switches govern both instead of 
introducing a separate
+            // session variable.
+            if (enableLocalShufflePlanner) {
+                try {
+                    ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
+                            ShuffleType.NATURAL,
+                            ShuffleType.STORAGE_BUCKETED
+                    );
+                    for (int i = 0; i < originChildrenProperties.size(); i++) {
+                        PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                        DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                        if (childDistribution instanceof DistributionSpecHash
+                                && supportBucketShuffleTypes.contains(

Review Comment:
   The set-op bucket-shuffle chooser still accepts `STORAGE_BUCKETED` children 
whose storage layout is unknown. That can happen for a child such as a hash 
join output that went through `withShuffleTypeAndForbidColocateJoin(...)`: it 
keeps `ShuffleType.STORAGE_BUCKETED` but clears `tableId` / `selectedIndexId` 
to `-1`.
   
   Reduced shape:
   
   ```text
   Intersect(k)
     child0: Project(k)
       HashJoin(...), output STORAGE_BUCKETED(tableId=-1)
     child1: OlapScan(k), output NATURAL(tableId=T)
   ```
   
   If child0 has the larger row count, this loop can pick it as 
`bucketShuffleBasicIndex` because it only checks the shuffle type. Lines 
734-738 then copy the `-1` layout into the enforced children. 
`ChildOutputPropertyDeriver` correctly refuses to prove a set-op bucketed 
output for `tableId < 0`, but the translator later marks the `SetOperationNode` 
`BUCKET_SHUFFLE` from 
`JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())` alone, 
and `SetOperationNode.enforceAndDeriveLocalExchange` switches to bucket-hash 
local exchange/scheduling for a storage layout that was never proved.
   
   Please require a known storage layout before selecting a `STORAGE_BUCKETED` 
child as the set-op bucket-shuffle basic side (or before marking the translated 
node `BUCKET_SHUFFLE`), and fall back to execution-bucketed shuffle when the 
layout is unknown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to