This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0882223dcb6 [fix](Nereids): fix offset in PlanTranslator (#29789)
0882223dcb6 is described below

commit 0882223dcb6054975e02e76223db94cc533947df
Author: jakevin <jakevin...@gmail.com>
AuthorDate: Mon Jan 15 17:34:08 2024 +0800

    [fix](Nereids): fix offset in PlanTranslator (#29789)
    
    Current BE operator don't support `offset`, we need add offset into 
`ExchangeNode`
---
 .../glue/translator/PhysicalPlanTranslator.java    | 23 +++++++-----
 .../processor/post/AddOffsetIntoDistribute.java    | 41 ++++++++++++++++++++++
 .../nereids/processor/post/PlanPostProcessors.java |  1 +
 .../trees/plans/physical/PhysicalLimit.java        |  5 +++
 4 files changed, 61 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6270c5795b3..b20d0738140 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -268,13 +268,14 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends 
Plan> distribute,
             PlanTranslatorContext context) {
-        PlanFragment inputFragment = distribute.child().accept(this, context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(distribute.child());
+        Plan child = distribute.child();
+        PlanFragment inputFragment = child.accept(this, context);
+        List<List<Expr>> distributeExprLists = getDistributeExprs(child);
         // TODO: why need set streaming here? should remove this.
         if (inputFragment.getPlanRoot() instanceof AggregationNode
-                && distribute.child() instanceof PhysicalHashAggregate
-                && context.getFirstAggregateInFragment(inputFragment) == 
distribute.child()) {
-            PhysicalHashAggregate<?> hashAggregate = 
(PhysicalHashAggregate<?>) distribute.child();
+                && child instanceof PhysicalHashAggregate
+                && context.getFirstAggregateInFragment(inputFragment) == 
child) {
+            PhysicalHashAggregate<?> hashAggregate = 
(PhysicalHashAggregate<?>) child;
             if (hashAggregate.getAggPhase() == AggPhase.LOCAL
                     && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) {
                 AggregationNode aggregationNode = (AggregationNode) 
inputFragment.getPlanRoot();
@@ -285,23 +286,26 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), 
inputFragment.getPlanRoot());
         updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute);
         List<ExprId> validOutputIds = distribute.getOutputExprIds();
-        if (distribute.child() instanceof PhysicalHashAggregate) {
+        if (child instanceof PhysicalHashAggregate) {
             // we must add group by keys to output list,
             // otherwise we could not process aggregate's output without group 
by keys
-            List<ExprId> keys = ((PhysicalHashAggregate<?>) 
distribute.child()).getGroupByExpressions().stream()
+            List<ExprId> keys = ((PhysicalHashAggregate<?>) 
child).getGroupByExpressions().stream()
                     .filter(SlotReference.class::isInstance)
                     .map(SlotReference.class::cast)
                     .map(SlotReference::getExprId)
                     .collect(Collectors.toList());
             keys.addAll(validOutputIds);
             validOutputIds = keys;
+        } else if (child instanceof PhysicalLimit && ((PhysicalLimit<?>) 
child).getPhase().isGlobal()) {
+            // because sort already contains Offset, we don't need to handle 
PhysicalTopN
+            exchangeNode.setOffset(((PhysicalLimit<?>) child).getOffset());
         }
         if (inputFragment instanceof MultiCastPlanFragment) {
             // TODO: remove this logic when we split to multi-window in 
logical window to physical window conversion
             MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
             DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
                     multiCastDataSink.getDataStreamSinks().size() - 1);
-            if (!(distribute.child() instanceof PhysicalProject)) {
+            if (!(child instanceof PhysicalProject)) {
                 List<Expr> projectionExprs = new ArrayList<>();
                 PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute);
                 Preconditions.checkState(consumer != null, "consumer not 
found");
@@ -1591,7 +1595,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment inputFragment = physicalLimit.child(0).accept(this, 
context);
         PlanNode child = inputFragment.getPlanRoot();
         child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), 
physicalLimit.getOffset(), child.getLimit()));
-        child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), 
child.getOffset()));
+        // TODO: plan node don't support limit
+        // child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), 
child.getOffset()));
         updateLegacyPlanIdToPhysicalPlan(child, physicalLimit);
         return inputFragment;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java
new file mode 100644
index 00000000000..deac3988698
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.DistributionSpecGather;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
+
+/**
+ * Offset just can be in exchangeNode.
+ * So, `offset` action is after `limit` action.
+ * So, `limit` should update with `offset + limit`
+ */
+public class AddOffsetIntoDistribute extends PlanPostProcessor {
+    @Override
+    public Plan visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, 
CascadesContext context) {
+        if (limit.getPhase().isLocal() || limit.getOffset() == 0) {
+            return limit;
+        }
+
+        return new PhysicalDistribute<>(DistributionSpecGather.INSTANCE,
+                limit.withLimit(limit.getLimit() + limit.getOffset()));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index 17538d55d45..6d85bebb2d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -61,6 +61,7 @@ public class PlanPostProcessors {
         builder.add(new PushDownFilterThroughProject());
         builder.add(new MergeProjectPostProcessor());
         builder.add(new RecomputeLogicalPropertiesProcessor());
+        builder.add(new AddOffsetIntoDistribute());
         builder.add(new TopNScanOpt());
         // after generate rf, DO NOT replace PLAN NODE
         builder.add(new FragmentProcessor());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
index ab7f5f811eb..bbc2143df61 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
@@ -101,6 +101,11 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHILD_
         return phase == LimitPhase.GLOBAL;
     }
 
+    public Plan withLimit(long limit) {
+        return new PhysicalLimit<>(limit, offset, phase, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, children.get(0));
+    }
+
     @Override
     public Plan withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to