924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3452056959


##########
fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java:
##########
@@ -86,11 +90,30 @@ public String getNodeExplainString(String detailPrefix, 
TExplainLevel detailLeve
 
     // Determined by its child.
     @Override
-    public boolean isSerialOperator() {
-        return children.get(0).isSerialOperator();
+    public boolean isSerialNode() {
+        return children.get(0).isSerialNode();
     }
 
     public GroupingInfo getGroupingInfo() {
         return groupingInfo;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        // REPEAT (rollup/grouping sets) is NOT distribution-preserving: it 
NULLs grouping
+        // columns per set and produces GROUPING_ID, which is part of the 
downstream agg hash
+        // key but does not exist below the repeat. Forwarding the parent HASH 
require down
+        // would push the local exchange before the row expansion AND hash by 
the child
+        // distribution (a single upstream shuffle key) instead of the agg 
grouping_exprs,
+        // collapsing rows onto one instance (tpcds q67, +73%). Recurse with 
noRequire so the
+        // parent inserts its hash local exchange ABOVE the repeat using its 
own grouping_exprs
+        // (mirrors BE, whose RepeatOperatorX has a NOOP 
required_data_distribution).
+        Pair<PlanNode, LocalExchangeType> enforceResult
+                = enforceRequire(translatorContext, children.get(0), 0,
+                        LocalExchangeTypeRequire.noRequire());
+        children = new java.util.ArrayList<>();
+        children.add(enforceResult.first);

Review Comment:
   Thanks for the detailed analysis. I checked this against the BE native 
local-shuffle path and I don't believe it's a correctness issue — forwarding 
the child's distribution here is a deliberate mirror of BE, and the local 
exchange above `Repeat` is parallelism-only.
   
   **The grouping-id correctness shuffle is a cross-fragment network exchange, 
not a local exchange.** `EXPLAIN` of `GROUP BY GROUPING SETS` over a bucketed 
table:
   
   ```
   Fragment 1:  VOlapScanNode -> VREPEAT_NODE (output: ..., GROUPING_ID)
                -> VAGGREGATE (update serialize, STREAMING)              // 
partial pre-agg
                -> STREAM DATA SINK  HASH_PARTITIONED: <keys>, GROUPING_ID   // 
network shuffle
   Fragment 0:  VEXCHANGE -> VAGGREGATE (merge finalize)                 // 
final agg
   ```
   
   The aggregate directly above `Repeat` is a **streaming partial 
pre-aggregation**; it does not require its input partitioned by the grouping 
keys. Global correctness (grouping by `GROUPING_ID`) is guaranteed by the 
network shuffle (`HASH_PARTITIONED` by the grouping exprs + `GROUPING_ID`) 
feeding the final merge aggregate in Fragment 0. A `LocalExchangeNode` above 
`Repeat` can therefore only change the partial agg's within-backend parallelism 
— never the result.
   
   **BE's native path skips the exchange here too.** 
`Pipeline::need_to_local_exchange` (`be/src/exec/pipeline/pipeline.cpp`):
   
   ```cpp
   return _data_distribution.distribution_type != 
target_data_distribution.distribution_type &&
          !(is_hash_exchange(_data_distribution.distribution_type) &&
            is_hash_exchange(target_data_distribution.distribution_type));   // 
both hash -> skip
   ```
   
   For `Scan -> Repeat -> AggSink`, the pipeline `_data_distribution` is the 
scan's `required_data_distribution` = `BUCKET_HASH_SHUFFLE` 
(`scan_operator.h`), `RepeatOperatorX` requires `NOOP` (no LE), and the AggSink 
requires `GLOBAL_EXECUTION_HASH_SHUFFLE` — both hash, so BE skips the local 
exchange above `Repeat`.
   
   `RequireHash.satisfy(<any hash>) == true` on the FE side is the exact mirror 
of that BE rule, and `RepeatNode` forwarding the child's distribution is what 
lets the FE satisfy-check observe the same distribution BE's 
`_data_distribution` holds. Returning `NOOP` would force a local exchange BE 
never inserts, diverging from the native path.
   
   So I'll keep the current behavior. Happy to add a coverage case asserting 
that a hash-distributed child is forwarded and no LE is inserted above 
`Repeat`, to lock in the parity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to