This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit cc363f26c26515fe4c06cac0f9fca9b0500cc045 Author: 924060929 <924060...@qq.com> AuthorDate: Mon Apr 1 21:28:39 2024 +0800 [fix](Nereids) fix group concat (#33091) Fix failed in regression_test/suites/query_p0/group_concat/test_group_concat.groovy select group_concat( distinct b1, '?'), group_concat( distinct b3, '?') from table_group_concat group by b2 exception: lowestCostPlans with physicalProperties(GATHER) doesn't exist in root group The root cause is '?' is push down to slot by NormalizeAggregate, AggregateStrategies treat the slot as a distinct parameter and generate a invalid PhysicalHashAggregate, and then reject by ChildOutputPropertyDeriver. I fix this bug by avoid push down literal to slot in NormalizeAggregate, and forbidden generate stream aggregate node when group by slots is empty --- be/src/pipeline/pipeline_fragment_context.cpp | 9 ++++-- .../pipeline_x/pipeline_x_fragment_context.cpp | 11 ++++++-- be/src/runtime/descriptors.h | 5 ++++ be/src/vec/exec/vaggregation_node.h | 1 + .../java/org/apache/doris/nereids/memo/Group.java | 22 +++++++++++++-- .../apache/doris/nereids/memo/GroupExpression.java | 5 ++++ .../properties/ChildrenPropertiesRegulator.java | 4 ++- .../nereids/properties/PhysicalProperties.java | 6 ++-- .../nereids/rules/analysis/NormalizeAggregate.java | 9 +++++- .../rules/implementation/AggregateStrategies.java | 32 ++++++++++++++++++++++ 10 files changed, 93 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index c273a0c3807..a32d777788d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -559,7 +559,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node); auto new_pipe = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe)); - if (agg_node->is_aggregate_evaluators_empty()) { + if (agg_node->is_probe_expr_ctxs_empty() && node->row_desc().num_slots() == 0) { + return Status::InternalError("Illegal aggregate node " + + std::to_string(agg_node->id()) + + ": group by and output is empty"); + } + if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) { auto data_queue = std::make_shared<DataQueue>(1); OperatorBuilderPtr pre_agg_sink = std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node, @@ -570,7 +575,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur std::make_shared<DistinctStreamingAggSourceOperatorBuilder>( node->id(), agg_node, data_queue); RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); - } else if (agg_node->is_streaming_preagg()) { + } else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) { auto data_queue = std::make_shared<DataQueue>(1); OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>( node->id(), agg_node, data_queue); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 744ce754a59..5dac71e8420 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -989,13 +989,20 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN break; } case TPlanNodeType::AGGREGATION_NODE: { + if (tnode.agg_node.grouping_exprs.empty() && + descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) { + return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) + + ": group by and output is empty"); + } if (tnode.agg_node.aggregate_functions.empty() && !_runtime_state->enable_agg_spill() && request.query_options.__isset.enable_distinct_streaming_aggregation && - request.query_options.enable_distinct_streaming_aggregation) { + request.query_options.enable_distinct_streaming_aggregation && + !tnode.agg_node.grouping_exprs.empty()) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && - tnode.agg_node.use_streaming_preaggregation) { + tnode.agg_node.use_streaming_preaggregation && + !tnode.agg_node.grouping_exprs.empty()) { op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index fff1ed339d5..7cb7e9fe015 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -505,10 +505,12 @@ public: _has_varlen_slots(desc._has_varlen_slots) { _num_materialized_slots = 0; _num_null_slots = 0; + _num_slots = 0; std::vector<TupleDescriptor*>::const_iterator it = desc._tuple_desc_map.begin(); for (; it != desc._tuple_desc_map.end(); ++it) { _num_materialized_slots += (*it)->num_materialized_slots(); _num_null_slots += (*it)->num_null_slots(); + _num_slots += (*it)->slots().size(); } _num_null_bytes = (_num_null_slots + 7) / 8; } @@ -531,6 +533,8 @@ public: int num_null_bytes() const { return _num_null_bytes; } + int num_slots() const { return _num_slots; } + static const int INVALID_IDX; // Returns INVALID_IDX if id not part of this row. @@ -585,6 +589,7 @@ private: int _num_materialized_slots; int _num_null_slots; int _num_null_bytes; + int _num_slots; }; } // namespace doris diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index f09ebcbba83..f89bbb9d780 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -416,6 +416,7 @@ public: Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); + bool is_probe_expr_ctxs_empty() const { return _probe_expr_ctxs.empty(); } bool is_streaming_preagg() const { return _is_streaming_preagg; } bool is_aggregate_evaluators_empty() const { return _aggregate_evaluators.empty(); } void _make_nullable_output_key(Block* block); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java index 5a5abd56f95..01968a03bef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java @@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -65,7 +66,7 @@ public class Group { // Map of cost lower bounds // Map required plan props to cost lower bound of corresponding plan - private final Map<PhysicalProperties, Pair<Cost, GroupExpression>> lowestCostPlans = Maps.newHashMap(); + private final Map<PhysicalProperties, Pair<Cost, GroupExpression>> lowestCostPlans = Maps.newLinkedHashMap(); private boolean isExplored = false; @@ -228,6 +229,12 @@ public class Group { return costAndGroupExpression; } + public Map<PhysicalProperties, Cost> getLowestCosts() { + return lowestCostPlans.entrySet() + .stream() + .collect(ImmutableMap.toImmutableMap(Entry::getKey, kv -> kv.getValue().first)); + } + public GroupExpression getBestPlan(PhysicalProperties properties) { if (lowestCostPlans.containsKey(properties)) { return lowestCostPlans.get(properties).second; @@ -489,9 +496,18 @@ public class Group { public String treeString() { Function<Object, String> toString = obj -> { if (obj instanceof Group) { - return "Group[" + ((Group) obj).groupId + "]"; + Group group = (Group) obj; + Map<PhysicalProperties, Cost> lowestCosts = group.getLowestCosts(); + return "Group[" + group.groupId + ", lowestCosts: " + lowestCosts + "]"; } else if (obj instanceof GroupExpression) { - return ((GroupExpression) obj).getPlan().toString(); + GroupExpression groupExpression = (GroupExpression) obj; + Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> lowestCostTable + = groupExpression.getLowestCostTable(); + Map<PhysicalProperties, PhysicalProperties> requestPropertiesMap + = groupExpression.getRequestPropertiesMap(); + Cost cost = groupExpression.getCost(); + return groupExpression.getPlan().toString() + " [cost: " + cost + ", lowestCostTable: " + + lowestCostTable + ", requestPropertiesMap: " + requestPropertiesMap + "]"; } else if (obj instanceof Pair) { // print logicalExpressions or physicalExpressions // first is name, second is group expressions diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java index eda7f5c9c35..24bc9383b52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java @@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -318,6 +319,10 @@ public class GroupExpression { this.estOutputRowCount = estOutputRowCount; } + public Map<PhysicalProperties, PhysicalProperties> getRequestPropertiesMap() { + return ImmutableMap.copyOf(requestPropertiesMap); + } + @Override public String toString() { DecimalFormat format = new DecimalFormat("#,###.##"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 7c5374ebd21..366730f7dc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -104,6 +104,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { @Override public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg, Void context) { + if (agg.getGroupByExpressions().isEmpty() && agg.getOutputExpressions().isEmpty()) { + return false; + } if (!agg.getAggregateParam().canBeBanned) { return true; } @@ -121,7 +124,6 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { return true; } return false; - } // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index 81e7190e163..031f18ab918 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -88,11 +88,13 @@ public class PhysicalProperties { .map(SlotReference.class::cast) .map(SlotReference::getExprId) .collect(Collectors.toList()); - return createHash(partitionedSlots, shuffleType); + return partitionedSlots.isEmpty() ? PhysicalProperties.GATHER : createHash(partitionedSlots, shuffleType); } public static PhysicalProperties createHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType) { - return new PhysicalProperties(new DistributionSpecHash(orderedShuffledColumns, shuffleType)); + return orderedShuffledColumns.isEmpty() + ? PhysicalProperties.GATHER + : new PhysicalProperties(new DistributionSpecHash(orderedShuffledColumns, shuffleType)); } public static PhysicalProperties createHash(DistributionSpecHash distributionSpecHash) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java index 7f6df51248e..e9b3d32da6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.SubqueryExpr; import org.apache.doris.nereids.trees.expressions.WindowExpression; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalHaving; @@ -152,6 +153,9 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot { Map<Boolean, ImmutableSet<Expression>> categorizedNoDistinctAggsChildren = aggFuncs.stream() .filter(aggFunc -> !aggFunc.isDistinct()) .flatMap(agg -> agg.children().stream()) + // should not push down literal under aggregate + // e.g. group_concat(distinct xxx, ','), the ',' literal show stay in aggregate + .filter(arg -> !(arg instanceof Literal)) .collect(Collectors.groupingBy( child -> child.containsType(SubqueryExpr.class, WindowExpression.class), ImmutableSet.toImmutableSet())); @@ -159,9 +163,12 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot { // split distinct agg child as two parts // TRUE part 1: need push down itself, if it is NOT SlotReference or Literal // FALSE part 2: need push down its input slots, if it is SlotReference or Literal - Map<Object, ImmutableSet<Expression>> categorizedDistinctAggsChildren = aggFuncs.stream() + Map<Boolean, ImmutableSet<Expression>> categorizedDistinctAggsChildren = aggFuncs.stream() .filter(AggregateFunction::isDistinct) .flatMap(agg -> agg.children().stream()) + // should not push down literal under aggregate + // e.g. group_concat(distinct xxx, ','), the ',' literal show stay in aggregate + .filter(arg -> !(arg instanceof Literal)) .collect( Collectors.groupingBy( child -> !(child instanceof SlotReference), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 61aac4d2407..edbd28677b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -70,6 +70,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate.PushDownAggOp; +import org.apache.doris.nereids.types.TinyIntType; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.qe.ConnectContext; @@ -1292,6 +1293,15 @@ public class AggregateStrategies implements ImplementationRuleFactory { .build(); List<Expression> localAggGroupBy = ImmutableList.copyOf(localAggGroupBySet); + boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && localAggOutput.isEmpty(); + + // be not recommend generate an aggregate node with empty group by and empty output, + // so add a null int slot to group by slot and output + if (isGroupByEmptySelectEmpty) { + localAggGroupBy = ImmutableList.of(new NullLiteral(TinyIntType.INSTANCE)); + localAggOutput = ImmutableList.of(new Alias(new NullLiteral(TinyIntType.INSTANCE))); + } + boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext, localAggGroupBy); List<Expression> partitionExpressions = getHashAggregatePartitionExpressions(logicalAgg); RequireProperties requireAny = RequireProperties.of(PhysicalProperties.ANY); @@ -1317,6 +1327,12 @@ public class AggregateStrategies implements ImplementationRuleFactory { .addAll(nonDistinctAggFunctionToAliasPhase2.values()) .build(); + // be not recommend generate an aggregate node with empty group by and empty output, + // so add a null int slot to group by slot and output + if (isGroupByEmptySelectEmpty) { + globalAggOutput = ImmutableList.of(new Alias(new NullLiteral(TinyIntType.INSTANCE))); + } + RequireProperties requireGather = RequireProperties.of(PhysicalProperties.GATHER); PhysicalHashAggregate<Plan> anyLocalGatherGlobalAgg = new PhysicalHashAggregate<>( localAggGroupBy, globalAggOutput, Optional.of(partitionExpressions), @@ -1680,6 +1696,16 @@ public class AggregateStrategies implements ImplementationRuleFactory { boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext, localAggGroupBy); List<Expression> partitionExpressions = getHashAggregatePartitionExpressions(logicalAgg); RequireProperties requireAny = RequireProperties.of(PhysicalProperties.ANY); + + boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && localAggOutput.isEmpty(); + + // be not recommend generate an aggregate node with empty group by and empty output, + // so add a null int slot to group by slot and output + if (isGroupByEmptySelectEmpty) { + localAggGroupBy = ImmutableList.of(new NullLiteral(TinyIntType.INSTANCE)); + localAggOutput = ImmutableList.of(new Alias(new NullLiteral(TinyIntType.INSTANCE))); + } + PhysicalHashAggregate<Plan> anyLocalAgg = new PhysicalHashAggregate<>(localAggGroupBy, localAggOutput, Optional.of(partitionExpressions), inputToBufferParam, maybeUsingStreamAgg, Optional.empty(), logicalAgg.getLogicalProperties(), @@ -1702,6 +1728,12 @@ public class AggregateStrategies implements ImplementationRuleFactory { .addAll(nonDistinctAggFunctionToAliasPhase2.values()) .build(); + // be not recommend generate an aggregate node with empty group by and empty output, + // so add a null int slot to group by slot and output + if (isGroupByEmptySelectEmpty) { + globalAggOutput = ImmutableList.of(new Alias(new NullLiteral(TinyIntType.INSTANCE))); + } + RequireProperties requireGather = RequireProperties.of(PhysicalProperties.GATHER); RequireProperties requireDistinctHash = RequireProperties.of( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org