This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0882223dcb6 [fix](Nereids): fix offset in PlanTranslator (#29789) 0882223dcb6 is described below commit 0882223dcb6054975e02e76223db94cc533947df Author: jakevin <jakevin...@gmail.com> AuthorDate: Mon Jan 15 17:34:08 2024 +0800 [fix](Nereids): fix offset in PlanTranslator (#29789) Current BE operator don't support `offset`, we need add offset into `ExchangeNode` --- .../glue/translator/PhysicalPlanTranslator.java | 23 +++++++----- .../processor/post/AddOffsetIntoDistribute.java | 41 ++++++++++++++++++++++ .../nereids/processor/post/PlanPostProcessors.java | 1 + .../trees/plans/physical/PhysicalLimit.java | 5 +++ 4 files changed, 61 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6270c5795b3..b20d0738140 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -268,13 +268,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla @Override public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute, PlanTranslatorContext context) { - PlanFragment inputFragment = distribute.child().accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(distribute.child()); + Plan child = distribute.child(); + PlanFragment inputFragment = child.accept(this, context); + List<List<Expr>> distributeExprLists = getDistributeExprs(child); // TODO: why need set streaming here? should remove this. if (inputFragment.getPlanRoot() instanceof AggregationNode - && distribute.child() instanceof PhysicalHashAggregate - && context.getFirstAggregateInFragment(inputFragment) == distribute.child()) { - PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) distribute.child(); + && child instanceof PhysicalHashAggregate + && context.getFirstAggregateInFragment(inputFragment) == child) { + PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) child; if (hashAggregate.getAggPhase() == AggPhase.LOCAL && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) { AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot(); @@ -285,23 +286,26 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), inputFragment.getPlanRoot()); updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute); List<ExprId> validOutputIds = distribute.getOutputExprIds(); - if (distribute.child() instanceof PhysicalHashAggregate) { + if (child instanceof PhysicalHashAggregate) { // we must add group by keys to output list, // otherwise we could not process aggregate's output without group by keys - List<ExprId> keys = ((PhysicalHashAggregate<?>) distribute.child()).getGroupByExpressions().stream() + List<ExprId> keys = ((PhysicalHashAggregate<?>) child).getGroupByExpressions().stream() .filter(SlotReference.class::isInstance) .map(SlotReference.class::cast) .map(SlotReference::getExprId) .collect(Collectors.toList()); keys.addAll(validOutputIds); validOutputIds = keys; + } else if (child instanceof PhysicalLimit && ((PhysicalLimit<?>) child).getPhase().isGlobal()) { + // because sort already contains Offset, we don't need to handle PhysicalTopN + exchangeNode.setOffset(((PhysicalLimit<?>) child).getOffset()); } if (inputFragment instanceof MultiCastPlanFragment) { // TODO: remove this logic when we split to multi-window in logical window to physical window conversion MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - if (!(distribute.child() instanceof PhysicalProject)) { + if (!(child instanceof PhysicalProject)) { List<Expr> projectionExprs = new ArrayList<>(); PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute); Preconditions.checkState(consumer != null, "consumer not found"); @@ -1591,7 +1595,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment inputFragment = physicalLimit.child(0).accept(this, context); PlanNode child = inputFragment.getPlanRoot(); child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), child.getLimit())); - child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset())); + // TODO: plan node don't support limit + // child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset())); updateLegacyPlanIdToPhysicalPlan(child, physicalLimit); return inputFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java new file mode 100644 index 00000000000..deac3988698 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.properties.DistributionSpecGather; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; +import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; + +/** + * Offset just can be in exchangeNode. + * So, `offset` action is after `limit` action. + * So, `limit` should update with `offset + limit` + */ +public class AddOffsetIntoDistribute extends PlanPostProcessor { + @Override + public Plan visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, CascadesContext context) { + if (limit.getPhase().isLocal() || limit.getOffset() == 0) { + return limit; + } + + return new PhysicalDistribute<>(DistributionSpecGather.INSTANCE, + limit.withLimit(limit.getLimit() + limit.getOffset())); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 17538d55d45..6d85bebb2d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -61,6 +61,7 @@ public class PlanPostProcessors { builder.add(new PushDownFilterThroughProject()); builder.add(new MergeProjectPostProcessor()); builder.add(new RecomputeLogicalPropertiesProcessor()); + builder.add(new AddOffsetIntoDistribute()); builder.add(new TopNScanOpt()); // after generate rf, DO NOT replace PLAN NODE builder.add(new FragmentProcessor()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java index ab7f5f811eb..bbc2143df61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java @@ -101,6 +101,11 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_ return phase == LimitPhase.GLOBAL; } + public Plan withLimit(long limit) { + return new PhysicalLimit<>(limit, offset, phase, groupExpression, getLogicalProperties(), + physicalProperties, statistics, children.get(0)); + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org