morrySnow commented on code in PR #9993: URL: https://github.com/apache/incubator-doris/pull/9993#discussion_r891120514
########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java: ########## @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.AbstractOperator; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.ExpressionConverter; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.CrossJoinNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.SortNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("rawtypes") +public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> { + + public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan, + PlanContext context) { + visit(physicalPlan, context); + } + + @Override + public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, + PlanContext context) { + PhysicalOperator<?> operator = physicalPlan.getOperator(); + return operator.accept(this, physicalPlan, context); + } + + @Override + public PlanFragment visitPhysicalAggregationPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + + PlanFragment inputPlanFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + + AggregationNode aggregationNode = null; + List<Slot> slotList = physicalPlan.getOutput(); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); + PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator(); + AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase(); + + List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() + .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> aggExpressionList = physicalAggregation.getAggExprList(); + // TODO: agg function could be other expr type either + ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + List<Expr> execPartitionExpressions = partitionExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList()); + // todo: support DISTINCT + AggregateInfo aggInfo = null; + switch (phase) { + case FIRST: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.unsetNeedsFinalize(); + aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream()); + aggregationNode.setIntermediateTuple(); + if (!partitionExpressionList.isEmpty()) { + inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions)); + } + break; + case FIRST_MERGE: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, Review Comment: do we need exchange node under agg merge node? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java: ########## @@ -44,4 +44,6 @@ @Override Plan child(int index); + Review Comment: blank lines ########## fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java: ########## @@ -64,6 +64,8 @@ public class ExchangeNode extends PlanNode { // only if mergeInfo_ is non-null, i.e. this is a merging exchange node. private long offset; + private DataPartition dataPartition; Review Comment: why add this? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java: ########## @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.AbstractOperator; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.ExpressionConverter; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.CrossJoinNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.SortNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("rawtypes") +public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> { + + public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan, + PlanContext context) { + visit(physicalPlan, context); + } + + @Override + public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, + PlanContext context) { + PhysicalOperator<?> operator = physicalPlan.getOperator(); + return operator.accept(this, physicalPlan, context); + } + + @Override + public PlanFragment visitPhysicalAggregationPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + + PlanFragment inputPlanFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + + AggregationNode aggregationNode = null; + List<Slot> slotList = physicalPlan.getOutput(); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); + PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator(); + AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase(); + + List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() + .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> aggExpressionList = physicalAggregation.getAggExprList(); + // TODO: agg function could be other expr type either + ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + List<Expr> execPartitionExpressions = partitionExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList()); + // todo: support DISTINCT + AggregateInfo aggInfo = null; + switch (phase) { + case FIRST: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.unsetNeedsFinalize(); + aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream()); + aggregationNode.setIntermediateTuple(); + if (!partitionExpressionList.isEmpty()) { + inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions)); + } + break; + case FIRST_MERGE: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + break; + default: + throw new RuntimeException("Unsupported yet"); + } + inputPlanFragment.setPlanRoot(aggregationNode); + return inputPlanFragment; + } + + @Override + public PlanFragment visitPhysicalOlapScanPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + // Create OlapScanNode + List<Slot> slotList = physicalPlan.getOutput(); + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator(); + OlapTable olapTable = physicalOlapScan.getTable(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable); + OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName()); + // Create PlanFragment + PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM); + context.addPlanFragment(planFragment); + return planFragment; + } + + @Override + public PlanFragment visitPhysicalSortPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment childFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator(); + if (!childFragment.isPartitioned()) { + return childFragment; + } + long limit = physicalSort.getLimit(); + long offset = physicalSort.getOffset(); + + List<Expr> execOrderingExprList = Lists.newArrayList(); + List<Boolean> ascOrderList = Lists.newArrayList(); + List<Boolean> nullsFirstParamList = Lists.newArrayList(); + + List<OrderKey> orderKeyList = physicalSort.getOrderList(); + orderKeyList.forEach(k -> { + execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr())); + ascOrderList.add(k.isAsc()); + nullsFirstParamList.add(k.isNullFirst()); + }); + + List<Slot> outputList = physicalPlan.getOutput(); + TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null); + SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); + + PlanNode childNode = childFragment.getPlanRoot(); + SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(), + physicalSort.hasLimit(), physicalSort.getOffset()); + + PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context); + ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot(); + exec(() -> { + exchNode.init(context.getAnalyzer()); + }); + exchNode.unsetLimit(); + if (physicalSort.hasLimit()) { + exchNode.setLimit(limit); + } + exchNode.setMergeInfo(sortNode.getSortInfo(), offset); + + // Child nodes should not process the offset. If there is a limit, + // the child nodes need only return (offset + limit) rows. + SortNode childSortNode = (SortNode) childFragment.getPlanRoot(); + Preconditions.checkState(sortNode == childSortNode); + if (sortNode.hasLimit()) { + childSortNode.unsetLimit(); + childSortNode.setLimit(limit + offset); + } + childSortNode.setOffset(0); + return mergeFragment; + } + + @Override + public PlanFragment visitPhysicalHashJoinPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment leftFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PlanFragment rightFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalHashJoin physicalHashJoin = (PhysicalHashJoin) physicalPlan.getOperator(); + Expression predicateExpr = physicalHashJoin.getPredicate(); + List<Expression> eqExprList = Utils.getEqConjuncts(physicalPlan.child(0).getOutput(), + physicalPlan.child(1).getOutput(), predicateExpr); + JoinType joinType = physicalHashJoin.getJoinType(); + + PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot(); + PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot(); + + if (joinType.equals(JoinType.CROSS_JOIN) + || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) { + CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(), + rightFragment.getPlanRoot(), null); + crossJoinNode.setLimit(physicalHashJoin.getLimited()); + List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream() + .map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + crossJoinNode.addConjuncts(conjuncts); + ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false); + exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + exec(() -> { + exchangeNode.init(context.getAnalyzer()); + }); + exchangeNode.setFragment(leftFragment); + leftFragmentPlanRoot.setChild(1, exchangeNode); + rightFragment.setDestination(exchangeNode); + crossJoinNode.setChild(0, leftFragment.getPlanRoot()); + leftFragment.setPlanRoot(crossJoinNode); + return leftFragment; + } + + List<Expression> expressionList = Utils.extractConjuncts(predicateExpr); + expressionList.removeAll(eqExprList); + List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot, Review Comment: add a TODO: add broadcast join, etc. later ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java: ########## @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.AbstractOperator; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.ExpressionConverter; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.CrossJoinNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.SortNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("rawtypes") +public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> { + + public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan, + PlanContext context) { + visit(physicalPlan, context); + } + + @Override + public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, + PlanContext context) { + PhysicalOperator<?> operator = physicalPlan.getOperator(); + return operator.accept(this, physicalPlan, context); + } + + @Override + public PlanFragment visitPhysicalAggregationPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + + PlanFragment inputPlanFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + + AggregationNode aggregationNode = null; + List<Slot> slotList = physicalPlan.getOutput(); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); + PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator(); + AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase(); + + List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() + .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> aggExpressionList = physicalAggregation.getAggExprList(); + // TODO: agg function could be other expr type either + ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + List<Expr> execPartitionExpressions = partitionExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList()); + // todo: support DISTINCT + AggregateInfo aggInfo = null; + switch (phase) { + case FIRST: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.unsetNeedsFinalize(); + aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream()); + aggregationNode.setIntermediateTuple(); + if (!partitionExpressionList.isEmpty()) { + inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions)); + } + break; + case FIRST_MERGE: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + break; + default: + throw new RuntimeException("Unsupported yet"); + } + inputPlanFragment.setPlanRoot(aggregationNode); + return inputPlanFragment; + } + + @Override + public PlanFragment visitPhysicalOlapScanPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + // Create OlapScanNode + List<Slot> slotList = physicalPlan.getOutput(); + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator(); + OlapTable olapTable = physicalOlapScan.getTable(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable); + OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName()); + // Create PlanFragment + PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM); + context.addPlanFragment(planFragment); + return planFragment; + } + + @Override + public PlanFragment visitPhysicalSortPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment childFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator(); + if (!childFragment.isPartitioned()) { Review Comment: why could just return child? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java: ########## @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.AbstractOperator; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.ExpressionConverter; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.CrossJoinNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.SortNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("rawtypes") +public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> { + + public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan, + PlanContext context) { + visit(physicalPlan, context); + } + + @Override + public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, + PlanContext context) { + PhysicalOperator<?> operator = physicalPlan.getOperator(); + return operator.accept(this, physicalPlan, context); + } + + @Override + public PlanFragment visitPhysicalAggregationPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + + PlanFragment inputPlanFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + + AggregationNode aggregationNode = null; + List<Slot> slotList = physicalPlan.getOutput(); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); + PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator(); + AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase(); + + List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() + .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> aggExpressionList = physicalAggregation.getAggExprList(); + // TODO: agg function could be other expr type either + ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + List<Expr> execPartitionExpressions = partitionExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList()); + // todo: support DISTINCT + AggregateInfo aggInfo = null; + switch (phase) { + case FIRST: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.unsetNeedsFinalize(); + aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream()); + aggregationNode.setIntermediateTuple(); + if (!partitionExpressionList.isEmpty()) { + inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions)); + } + break; + case FIRST_MERGE: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + break; + default: + throw new RuntimeException("Unsupported yet"); + } + inputPlanFragment.setPlanRoot(aggregationNode); + return inputPlanFragment; + } + + @Override + public PlanFragment visitPhysicalOlapScanPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + // Create OlapScanNode + List<Slot> slotList = physicalPlan.getOutput(); + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator(); + OlapTable olapTable = physicalOlapScan.getTable(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable); + OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName()); + // Create PlanFragment + PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM); + context.addPlanFragment(planFragment); + return planFragment; + } + + @Override + public PlanFragment visitPhysicalSortPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment childFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator(); + if (!childFragment.isPartitioned()) { + return childFragment; + } + long limit = physicalSort.getLimit(); + long offset = physicalSort.getOffset(); + + List<Expr> execOrderingExprList = Lists.newArrayList(); + List<Boolean> ascOrderList = Lists.newArrayList(); + List<Boolean> nullsFirstParamList = Lists.newArrayList(); + + List<OrderKey> orderKeyList = physicalSort.getOrderList(); + orderKeyList.forEach(k -> { + execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr())); + ascOrderList.add(k.isAsc()); + nullsFirstParamList.add(k.isNullFirst()); + }); + + List<Slot> outputList = physicalPlan.getOutput(); + TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null); + SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); + + PlanNode childNode = childFragment.getPlanRoot(); + SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(), + physicalSort.hasLimit(), physicalSort.getOffset()); + + PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context); + ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot(); + exec(() -> { + exchNode.init(context.getAnalyzer()); + }); + exchNode.unsetLimit(); + if (physicalSort.hasLimit()) { + exchNode.setLimit(limit); + } + exchNode.setMergeInfo(sortNode.getSortInfo(), offset); + + // Child nodes should not process the offset. If there is a limit, + // the child nodes need only return (offset + limit) rows. + SortNode childSortNode = (SortNode) childFragment.getPlanRoot(); + Preconditions.checkState(sortNode == childSortNode); + if (sortNode.hasLimit()) { + childSortNode.unsetLimit(); + childSortNode.setLimit(limit + offset); + } + childSortNode.setOffset(0); + return mergeFragment; + } + + @Override + public PlanFragment visitPhysicalHashJoinPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment leftFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PlanFragment rightFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalHashJoin physicalHashJoin = (PhysicalHashJoin) physicalPlan.getOperator(); + Expression predicateExpr = physicalHashJoin.getPredicate(); + List<Expression> eqExprList = Utils.getEqConjuncts(physicalPlan.child(0).getOutput(), + physicalPlan.child(1).getOutput(), predicateExpr); + JoinType joinType = physicalHashJoin.getJoinType(); + + PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot(); + PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot(); + + if (joinType.equals(JoinType.CROSS_JOIN) + || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) { + CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(), + rightFragment.getPlanRoot(), null); + crossJoinNode.setLimit(physicalHashJoin.getLimited()); + List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream() + .map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + crossJoinNode.addConjuncts(conjuncts); + ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false); + exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + exec(() -> { + exchangeNode.init(context.getAnalyzer()); + }); + exchangeNode.setFragment(leftFragment); + leftFragmentPlanRoot.setChild(1, exchangeNode); + rightFragment.setDestination(exchangeNode); + crossJoinNode.setChild(0, leftFragment.getPlanRoot()); + leftFragment.setPlanRoot(crossJoinNode); + return leftFragment; + } + + List<Expression> expressionList = Utils.extractConjuncts(predicateExpr); + expressionList.removeAll(eqExprList); + List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot, + JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList); + + ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false); + leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances()); + ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false); + rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + exec(() -> { + leftExch.init(context.getAnalyzer()); + rightExch.init(context.getAnalyzer()); + }); + hashJoinNode.setChild(0, leftFragmentPlanRoot); + hashJoinNode.setChild(1, leftFragmentPlanRoot); + hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); + hashJoinNode.setLimit(physicalHashJoin.getLimited()); + leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot()); + rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot); + return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition()); + } + + @Override + public PlanFragment visitPhysicalProject( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + } + + @Override + public PlanFragment visitPhysicalFilter( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); Review Comment: add prdicate to child node's conjuncts ########## fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java: ########## @@ -29,4 +29,6 @@ */ public interface PhysicalOperator<TYPE extends PhysicalOperator<TYPE>> extends PlanOperator<TYPE> { List<Slot> computeOutputs(LogicalProperties logicalProperties, Plan... inputs); + Review Comment: blank lines ########## fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java: ########## @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.operators.plans.physical; + +import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; + +import java.util.List; + +public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> { + + private int offset; + + private int limit = -1; + + private List<OrderKey> orderList; + + // if true, the output of this node feeds an AnalyticNode + private boolean isAnalyticSort; Review Comment: current, we do not support analytic functions. In old planner, this is a trick flag. In Nereids, this should no longer exists ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java: ########## @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.AbstractOperator; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.ExpressionConverter; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.CrossJoinNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.SortNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("rawtypes") +public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> { + + public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan, + PlanContext context) { + visit(physicalPlan, context); + } + + @Override + public PlanFragment visit(PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, + PlanContext context) { + PhysicalOperator<?> operator = physicalPlan.getOperator(); + return operator.accept(this, physicalPlan, context); + } + + @Override + public PlanFragment visitPhysicalAggregationPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + + PlanFragment inputPlanFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + + AggregationNode aggregationNode = null; + List<Slot> slotList = physicalPlan.getOutput(); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); + PhysicalAggregation physicalAggregation = (PhysicalAggregation) physicalPlan.getOperator(); + AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase(); + + List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() + .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> aggExpressionList = physicalAggregation.getAggExprList(); + // TODO: agg function could be other expr type either + ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + List<Expr> execPartitionExpressions = partitionExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList()); + // todo: support DISTINCT + AggregateInfo aggInfo = null; + switch (phase) { + case FIRST: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.unsetNeedsFinalize(); + aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream()); + aggregationNode.setIntermediateTuple(); + if (!partitionExpressionList.isEmpty()) { + inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions)); + } + break; + case FIRST_MERGE: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE, context.getAnalyzer()); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + break; + default: + throw new RuntimeException("Unsupported yet"); + } + inputPlanFragment.setPlanRoot(aggregationNode); + return inputPlanFragment; + } + + @Override + public PlanFragment visitPhysicalOlapScanPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + // Create OlapScanNode + List<Slot> slotList = physicalPlan.getOutput(); + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) physicalPlan.getOperator(); + OlapTable olapTable = physicalOlapScan.getTable(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable); + OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName()); + // Create PlanFragment + PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM); + context.addPlanFragment(planFragment); + return planFragment; + } + + @Override + public PlanFragment visitPhysicalSortPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment childFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalSort physicalSort = (PhysicalSort) physicalPlan.getOperator(); + if (!childFragment.isPartitioned()) { + return childFragment; + } + long limit = physicalSort.getLimit(); + long offset = physicalSort.getOffset(); + + List<Expr> execOrderingExprList = Lists.newArrayList(); + List<Boolean> ascOrderList = Lists.newArrayList(); + List<Boolean> nullsFirstParamList = Lists.newArrayList(); + + List<OrderKey> orderKeyList = physicalSort.getOrderList(); + orderKeyList.forEach(k -> { + execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr())); + ascOrderList.add(k.isAsc()); + nullsFirstParamList.add(k.isNullFirst()); + }); + + List<Slot> outputList = physicalPlan.getOutput(); + TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null); + SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); + + PlanNode childNode = childFragment.getPlanRoot(); + SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(), + physicalSort.hasLimit(), physicalSort.getOffset()); + + PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context); + ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot(); + exec(() -> { + exchNode.init(context.getAnalyzer()); + }); + exchNode.unsetLimit(); + if (physicalSort.hasLimit()) { + exchNode.setLimit(limit); + } + exchNode.setMergeInfo(sortNode.getSortInfo(), offset); + + // Child nodes should not process the offset. If there is a limit, + // the child nodes need only return (offset + limit) rows. + SortNode childSortNode = (SortNode) childFragment.getPlanRoot(); + Preconditions.checkState(sortNode == childSortNode); + if (sortNode.hasLimit()) { + childSortNode.unsetLimit(); + childSortNode.setLimit(limit + offset); + } + childSortNode.setOffset(0); + return mergeFragment; + } + + @Override + public PlanFragment visitPhysicalHashJoinPlan( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + PlanFragment leftFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PlanFragment rightFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); + PhysicalHashJoin physicalHashJoin = (PhysicalHashJoin) physicalPlan.getOperator(); + Expression predicateExpr = physicalHashJoin.getPredicate(); + List<Expression> eqExprList = Utils.getEqConjuncts(physicalPlan.child(0).getOutput(), + physicalPlan.child(1).getOutput(), predicateExpr); + JoinType joinType = physicalHashJoin.getJoinType(); + + PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot(); + PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot(); + + if (joinType.equals(JoinType.CROSS_JOIN) + || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) { + CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(), + rightFragment.getPlanRoot(), null); + crossJoinNode.setLimit(physicalHashJoin.getLimited()); + List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream() + .map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + crossJoinNode.addConjuncts(conjuncts); + ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false); + exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + exec(() -> { + exchangeNode.init(context.getAnalyzer()); + }); + exchangeNode.setFragment(leftFragment); + leftFragmentPlanRoot.setChild(1, exchangeNode); + rightFragment.setDestination(exchangeNode); + crossJoinNode.setChild(0, leftFragment.getPlanRoot()); + leftFragment.setPlanRoot(crossJoinNode); + return leftFragment; + } + + List<Expression> expressionList = Utils.extractConjuncts(predicateExpr); + expressionList.removeAll(eqExprList); + List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot, + JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList); + + ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false); + leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances()); + ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false); + rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + exec(() -> { + leftExch.init(context.getAnalyzer()); + rightExch.init(context.getAnalyzer()); + }); + hashJoinNode.setChild(0, leftFragmentPlanRoot); + hashJoinNode.setChild(1, leftFragmentPlanRoot); + hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); + hashJoinNode.setLimit(physicalHashJoin.getLimited()); + leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot()); + rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot); + return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition()); + } + + @Override + public PlanFragment visitPhysicalProject( + PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator> physicalPlan, PlanContext context) { + return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) physicalPlan.child(0), context); Review Comment: we need to discuss how to process this ########## fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java: ########## @@ -34,6 +35,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> { private final List<Long> selectedTabletId; private final List<Long> selectedPartitionId; + private OlapTable olapTable; Review Comment: why move this from parent class to sub class ########## fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java: ########## @@ -0,0 +1,82 @@ +package org.apache.doris.nereids.operators.plans.physical; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; + +import java.util.List; + +public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan>{ + + private List<Expression> groupByExprList; + + private List<Expression> aggExprList; + + private List<Expression> partitionExprList; + + private AggregateInfo.AggPhase aggPhase; Review Comment: maybe a new enum is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org