github-actions[bot] commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3522988301


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,131 @@ public PhysicalProperties 
visitPhysicalSetOperation(PhysicalSetOperation setOper
             return PhysicalProperties.GATHER;
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // int distributeToChildIndex
-        //         = 
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
-        // if (distributeToChildIndex >= 0
-        //         && childrenDistribution.get(distributeToChildIndex) 
instanceof DistributionSpecHash) {
-        //     DistributionSpecHash childDistribution
-        //             = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
-        //     List<SlotReference> childToIndex = 
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
-        //     Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
-        //     for (int j = 0; j < childToIndex.size(); j++) {
-        //         idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
-        //     }
+        // When set-op bucket shuffle is chosen, the set operation keeps the 
basic child's bucket
+        // distribution as its own output so the bucket distribution 
propagates upward instead of
+        // being flattened to execution-bucketed. The basic child is 
recomputed from the children
+        // distributions instead of being carried as mutable planner state: 
mutable state does not
+        // survive the with-copies in chooseBestPlan() and the
+        // RecomputePhysicalPropertiesPostProcessor re-derivation, while the 
recomputation below is
+        // deterministic on any copy of the plan.
         //
-        //     List<ExprId> orderedShuffledColumns = 
childDistribution.getOrderedShuffledColumns();
-        //     List<ExprId> setOperationDistributeColumnIds = new 
ArrayList<>();
-        //     for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
-        //         Integer index = 
idToOutputIndex.get(tableDistributeColumnId);
-        //         if (index == null) {
-        //             break;
-        //         }
-        //         
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
-        //     }
-        //     // check whether the set operation output all distribution 
columns of the child
-        //     if (setOperationDistributeColumnIds.size() == 
orderedShuffledColumns.size()) {
-        //         boolean isUnion = setOperation instanceof Union;
-        //         boolean shuffleToRight = distributeToChildIndex > 0;
-        //         if (!isUnion && shuffleToRight) {
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             ShuffleType.EXECUTION_BUCKETED
-        //                     )
-        //             );
-        //         } else {
-        //             // keep the distribution as the child
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             childDistribution.getShuffleType(),
-        //                             childDistribution.getTableId(),
-        //                             childDistribution.getSelectedIndexId(),
-        //                             childDistribution.getPartitionIds()
-        //                     )
-        //             );
-        //         }
-        //     }
-        // }
+        // The bucket-shuffle signature is structural: every child is 
hash-distributed by NATURAL
+        // or STORAGE_BUCKETED with the same storage layout (table / index / 
partitions — an
+        // enforced bucket-shuffle child carries the basic child's layout, see
+        // ChildrenPropertiesRegulator), and at least one child is 
STORAGE_BUCKETED. The layout
+        // equality rejects an un-enforced mix such as a NATURAL scan plus a 
lower bucket-shuffle
+        // plan distributed by another table's buckets (reachable through the 
ANY child request of
+        // union), and requiring one STORAGE_BUCKETED child rejects children 
that merely keep
+        // their own NATURAL distributions without any set-op enforcement. The 
basic child keeps
+        // its NATURAL distribution, so prefer the first NATURAL child and 
fall back to the first
+        // STORAGE_BUCKETED one (then every child shares the same layout, so 
the claim does not
+        // depend on which child the regulator actually picked).
+        int distributeToChildIndex = -1;
+        int firstNaturalIndex = -1;
+        int firstStorageBucketedIndex = -1;
+        boolean allChildrenBucketAligned = true;
+        DistributionSpecHash firstChildHash = null;
+        for (int i = 0; i < childrenDistribution.size(); i++) {
+            DistributionSpec childDistributionSpec = 
childrenDistribution.get(i);
+            if (!(childDistributionSpec instanceof DistributionSpecHash)) {
+                allChildrenBucketAligned = false;
+                break;
+            }
+            DistributionSpecHash childHash = (DistributionSpecHash) 
childDistributionSpec;
+            if (firstChildHash == null) {
+                firstChildHash = childHash;
+            } else if (childHash.getTableId() != firstChildHash.getTableId()
+                    || childHash.getSelectedIndexId() != 
firstChildHash.getSelectedIndexId()
+                    || 
!childHash.getPartitionIds().equals(firstChildHash.getPartitionIds())) {
+                allChildrenBucketAligned = false;
+                break;
+            }
+            ShuffleType childShuffleType = childHash.getShuffleType();
+            if (childShuffleType == ShuffleType.NATURAL) {
+                if (firstNaturalIndex < 0) {
+                    firstNaturalIndex = i;
+                }
+            } else if (childShuffleType == ShuffleType.STORAGE_BUCKETED) {
+                if (firstStorageBucketedIndex < 0) {
+                    firstStorageBucketedIndex = i;
+                }
+            } else {
+                allChildrenBucketAligned = false;
+                break;
+            }
+        }
+        if (allChildrenBucketAligned && firstStorageBucketedIndex >= 0) {
+            distributeToChildIndex = firstNaturalIndex >= 0 ? 
firstNaturalIndex : firstStorageBucketedIndex;
+        }
+        if (distributeToChildIndex >= 0) {
+            DistributionSpecHash childDistribution
+                    = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
+            // A shared storage layout alone does not prove alignment: two 
children may be
+            // bucketed by columns that feed different set-output positions 
(e.g. one child
+            // bucketed by the column feeding output k, another by the column 
feeding output v
+            // of the same table layout). Map every child's hash columns to 
set-output
+            // positions and require all children to land on the same 
positions in the same
+            // order; otherwise fall through to the generic derivation below.
+            List<Integer> outputPositions = null;
+            boolean allChildrenSamePositions = true;
+            for (int i = 0; i < childrenDistribution.size() && 
allChildrenSamePositions; i++) {
+                DistributionSpecHash childHash = (DistributionSpecHash) 
childrenDistribution.get(i);
+                List<SlotReference> childOutput = 
setOperation.getRegularChildrenOutputs().get(i);
+                Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
+                for (int j = 0; j < childOutput.size(); j++) {
+                    idToOutputIndex.put(childOutput.get(j).getExprId(), j);
+                }
+                List<Integer> positions = new ArrayList<>();
+                for (ExprId shuffledColumnId : 
childHash.getOrderedShuffledColumns()) {
+                    Integer index = idToOutputIndex.get(shuffledColumnId);
+                    if (index == null) {
+                        allChildrenSamePositions = false;
+                        break;
+                    }
+                    positions.add(index);
+                }
+                if (!allChildrenSamePositions) {
+                    break;
+                }
+                if (outputPositions == null) {
+                    outputPositions = positions;
+                } else if (!outputPositions.equals(positions)) {
+                    allChildrenSamePositions = false;
+                }

Review Comment:
   There is still an unsafe fallthrough after this bucket-alignment proof 
rejects a case. A reduced shape is:
   
   ```text
   Agg(group by u.k)
     UnionAll(k)
       child0: STORAGE_BUCKETED on k, layout T1
       child1: STORAGE_BUCKETED on k, layout T2
   ```
   
   The layout or position checks above correctly reject that as an aligned 
set-op bucket shuffle, but rejection just falls through to the generic 
set-operation derivation below. For the one-column union, the generic loop sees 
identical output offsets and identical `STORAGE_BUCKETED` shuffle types, then 
returns `PhysicalProperties.createHash([u.k], STORAGE_BUCKETED)`. That 
constructor drops table/index/partition layout, and 
`DistributionSpecHash.satisfy(REQUIRE)` only checks expression containment, so 
an upstream aggregate or join can accept the union as hash-distributed on `u.k` 
and skip the realignment exchange even though the two children use different 
bucket functions.
   
   Please return a non-specific or `EXECUTION_BUCKETED` property when the 
bucket-alignment proof fails, or make the generic fallback validate storage 
layout before it returns a concrete `NATURAL` / `STORAGE_BUCKETED` hash 
property.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to