This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit cc363f26c26515fe4c06cac0f9fca9b0500cc045
Author: 924060929 <924060...@qq.com>
AuthorDate: Mon Apr 1 21:28:39 2024 +0800

    [fix](Nereids) fix group concat (#33091)
    
    Fix failed in 
regression_test/suites/query_p0/group_concat/test_group_concat.groovy
    
    select
    group_concat( distinct b1, '?'), group_concat( distinct b3, '?')
    from
    table_group_concat
    group by
    b2
    
    exception:
    
    lowestCostPlans with physicalProperties(GATHER) doesn't exist in root group
    
    The root cause is '?' is push down to slot by NormalizeAggregate, 
AggregateStrategies treat the slot as a distinct parameter and generate a 
invalid PhysicalHashAggregate, and then reject by ChildOutputPropertyDeriver.
    
    I fix this bug by avoid push down literal to slot in NormalizeAggregate, 
and forbidden generate stream aggregate node when group by slots is empty
---
 be/src/pipeline/pipeline_fragment_context.cpp      |  9 ++++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 11 ++++++--
 be/src/runtime/descriptors.h                       |  5 ++++
 be/src/vec/exec/vaggregation_node.h                |  1 +
 .../java/org/apache/doris/nereids/memo/Group.java  | 22 +++++++++++++--
 .../apache/doris/nereids/memo/GroupExpression.java |  5 ++++
 .../properties/ChildrenPropertiesRegulator.java    |  4 ++-
 .../nereids/properties/PhysicalProperties.java     |  6 ++--
 .../nereids/rules/analysis/NormalizeAggregate.java |  9 +++++-
 .../rules/implementation/AggregateStrategies.java  | 32 ++++++++++++++++++++++
 10 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index c273a0c3807..a32d777788d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -559,7 +559,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
         auto new_pipe = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
-        if (agg_node->is_aggregate_evaluators_empty()) {
+        if (agg_node->is_probe_expr_ctxs_empty() && 
node->row_desc().num_slots() == 0) {
+            return Status::InternalError("Illegal aggregate node " +
+                                         std::to_string(agg_node->id()) +
+                                         ": group by and output is empty");
+        }
+        if (agg_node->is_aggregate_evaluators_empty() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink =
                     
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
@@ -570,7 +575,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
                     
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
                             node->id(), agg_node, data_queue);
             RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
-        } else if (agg_node->is_streaming_preagg()) {
+        } else if (agg_node->is_streaming_preagg() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 744ce754a59..5dac71e8420 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -989,13 +989,20 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
+        if (tnode.agg_node.grouping_exprs.empty() &&
+            
descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
+            return Status::InternalError("Illegal aggregate node " + 
std::to_string(tnode.node_id) +
+                                         ": group by and output is empty");
+        }
         if (tnode.agg_node.aggregate_functions.empty() && 
!_runtime_state->enable_agg_spill() &&
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
-            request.query_options.enable_distinct_streaming_aggregation) {
+            request.query_options.enable_distinct_streaming_aggregation &&
+            !tnode.agg_node.grouping_exprs.empty()) {
             op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
-                   tnode.agg_node.use_streaming_preaggregation) {
+                   tnode.agg_node.use_streaming_preaggregation &&
+                   !tnode.agg_node.grouping_exprs.empty()) {
             op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
         } else {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index fff1ed339d5..7cb7e9fe015 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -505,10 +505,12 @@ public:
               _has_varlen_slots(desc._has_varlen_slots) {
         _num_materialized_slots = 0;
         _num_null_slots = 0;
+        _num_slots = 0;
         std::vector<TupleDescriptor*>::const_iterator it = 
desc._tuple_desc_map.begin();
         for (; it != desc._tuple_desc_map.end(); ++it) {
             _num_materialized_slots += (*it)->num_materialized_slots();
             _num_null_slots += (*it)->num_null_slots();
+            _num_slots += (*it)->slots().size();
         }
         _num_null_bytes = (_num_null_slots + 7) / 8;
     }
@@ -531,6 +533,8 @@ public:
 
     int num_null_bytes() const { return _num_null_bytes; }
 
+    int num_slots() const { return _num_slots; }
+
     static const int INVALID_IDX;
 
     // Returns INVALID_IDX if id not part of this row.
@@ -585,6 +589,7 @@ private:
     int _num_materialized_slots;
     int _num_null_slots;
     int _num_null_bytes;
+    int _num_slots;
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index f09ebcbba83..f89bbb9d780 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -416,6 +416,7 @@ public:
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block, 
bool* eos) override;
     Status sink(doris::RuntimeState* state, vectorized::Block* input_block, 
bool eos) override;
     Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* 
output_block);
+    bool is_probe_expr_ctxs_empty() const { return _probe_expr_ctxs.empty(); }
     bool is_streaming_preagg() const { return _is_streaming_preagg; }
     bool is_aggregate_evaluators_empty() const { return 
_aggregate_evaluators.empty(); }
     void _make_nullable_output_key(Block* block);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
index 5a5abd56f95..01968a03bef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
@@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -65,7 +66,7 @@ public class Group {
 
     // Map of cost lower bounds
     // Map required plan props to cost lower bound of corresponding plan
-    private final Map<PhysicalProperties, Pair<Cost, GroupExpression>> 
lowestCostPlans = Maps.newHashMap();
+    private final Map<PhysicalProperties, Pair<Cost, GroupExpression>> 
lowestCostPlans = Maps.newLinkedHashMap();
 
     private boolean isExplored = false;
 
@@ -228,6 +229,12 @@ public class Group {
         return costAndGroupExpression;
     }
 
+    public Map<PhysicalProperties, Cost> getLowestCosts() {
+        return lowestCostPlans.entrySet()
+                .stream()
+                .collect(ImmutableMap.toImmutableMap(Entry::getKey, kv -> 
kv.getValue().first));
+    }
+
     public GroupExpression getBestPlan(PhysicalProperties properties) {
         if (lowestCostPlans.containsKey(properties)) {
             return lowestCostPlans.get(properties).second;
@@ -489,9 +496,18 @@ public class Group {
     public String treeString() {
         Function<Object, String> toString = obj -> {
             if (obj instanceof Group) {
-                return "Group[" + ((Group) obj).groupId + "]";
+                Group group = (Group) obj;
+                Map<PhysicalProperties, Cost> lowestCosts = 
group.getLowestCosts();
+                return "Group[" + group.groupId + ", lowestCosts: " + 
lowestCosts + "]";
             } else if (obj instanceof GroupExpression) {
-                return ((GroupExpression) obj).getPlan().toString();
+                GroupExpression groupExpression = (GroupExpression) obj;
+                Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> 
lowestCostTable
+                        = groupExpression.getLowestCostTable();
+                Map<PhysicalProperties, PhysicalProperties> 
requestPropertiesMap
+                        = groupExpression.getRequestPropertiesMap();
+                Cost cost = groupExpression.getCost();
+                return groupExpression.getPlan().toString() + " [cost: " + 
cost + ", lowestCostTable: "
+                        + lowestCostTable + ", requestPropertiesMap: " + 
requestPropertiesMap + "]";
             } else if (obj instanceof Pair) {
                 // print logicalExpressions or physicalExpressions
                 // first is name, second is group expressions
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index eda7f5c9c35..24bc9383b52 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -318,6 +319,10 @@ public class GroupExpression {
         this.estOutputRowCount = estOutputRowCount;
     }
 
+    public Map<PhysicalProperties, PhysicalProperties> 
getRequestPropertiesMap() {
+        return ImmutableMap.copyOf(requestPropertiesMap);
+    }
+
     @Override
     public String toString() {
         DecimalFormat format = new DecimalFormat("#,###.##");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 7c5374ebd21..366730f7dc5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -104,6 +104,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
 
     @Override
     public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends 
Plan> agg, Void context) {
+        if (agg.getGroupByExpressions().isEmpty() && 
agg.getOutputExpressions().isEmpty()) {
+            return false;
+        }
         if (!agg.getAggregateParam().canBeBanned) {
             return true;
         }
@@ -121,7 +124,6 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                 return true;
             }
             return false;
-
         }
 
         // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 81e7190e163..031f18ab918 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -88,11 +88,13 @@ public class PhysicalProperties {
                 .map(SlotReference.class::cast)
                 .map(SlotReference::getExprId)
                 .collect(Collectors.toList());
-        return createHash(partitionedSlots, shuffleType);
+        return partitionedSlots.isEmpty() ? PhysicalProperties.GATHER : 
createHash(partitionedSlots, shuffleType);
     }
 
     public static PhysicalProperties createHash(List<ExprId> 
orderedShuffledColumns, ShuffleType shuffleType) {
-        return new PhysicalProperties(new 
DistributionSpecHash(orderedShuffledColumns, shuffleType));
+        return orderedShuffledColumns.isEmpty()
+                ? PhysicalProperties.GATHER
+                : new PhysicalProperties(new 
DistributionSpecHash(orderedShuffledColumns, shuffleType));
     }
 
     public static PhysicalProperties createHash(DistributionSpecHash 
distributionSpecHash) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
index 7f6df51248e..e9b3d32da6e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
@@ -31,6 +31,7 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
 import org.apache.doris.nereids.trees.expressions.WindowExpression;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
@@ -152,6 +153,9 @@ public class NormalizeAggregate implements 
RewriteRuleFactory, NormalizeToSlot {
         Map<Boolean, ImmutableSet<Expression>> 
categorizedNoDistinctAggsChildren = aggFuncs.stream()
                 .filter(aggFunc -> !aggFunc.isDistinct())
                 .flatMap(agg -> agg.children().stream())
+                // should not push down literal under aggregate
+                // e.g. group_concat(distinct xxx, ','), the ',' literal show 
stay in aggregate
+                .filter(arg -> !(arg instanceof Literal))
                 .collect(Collectors.groupingBy(
                         child -> child.containsType(SubqueryExpr.class, 
WindowExpression.class),
                         ImmutableSet.toImmutableSet()));
@@ -159,9 +163,12 @@ public class NormalizeAggregate implements 
RewriteRuleFactory, NormalizeToSlot {
         // split distinct agg child as two parts
         // TRUE part 1: need push down itself, if it is NOT SlotReference or 
Literal
         // FALSE part 2: need push down its input slots, if it is 
SlotReference or Literal
-        Map<Object, ImmutableSet<Expression>> categorizedDistinctAggsChildren 
= aggFuncs.stream()
+        Map<Boolean, ImmutableSet<Expression>> categorizedDistinctAggsChildren 
= aggFuncs.stream()
                 .filter(AggregateFunction::isDistinct)
                 .flatMap(agg -> agg.children().stream())
+                // should not push down literal under aggregate
+                // e.g. group_concat(distinct xxx, ','), the ',' literal show 
stay in aggregate
+                .filter(arg -> !(arg instanceof Literal))
                 .collect(
                         Collectors.groupingBy(
                                 child -> !(child instanceof SlotReference),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 61aac4d2407..edbd28677b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -70,6 +70,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate.PushDownAggOp;
+import org.apache.doris.nereids.types.TinyIntType;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -1292,6 +1293,15 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 .build();
 
         List<Expression> localAggGroupBy = 
ImmutableList.copyOf(localAggGroupBySet);
+        boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && 
localAggOutput.isEmpty();
+
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            localAggGroupBy = ImmutableList.of(new 
NullLiteral(TinyIntType.INSTANCE));
+            localAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext, 
localAggGroupBy);
         List<Expression> partitionExpressions = 
getHashAggregatePartitionExpressions(logicalAgg);
         RequireProperties requireAny = 
RequireProperties.of(PhysicalProperties.ANY);
@@ -1317,6 +1327,12 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 .addAll(nonDistinctAggFunctionToAliasPhase2.values())
                 .build();
 
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            globalAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         RequireProperties requireGather = 
RequireProperties.of(PhysicalProperties.GATHER);
         PhysicalHashAggregate<Plan> anyLocalGatherGlobalAgg = new 
PhysicalHashAggregate<>(
                 localAggGroupBy, globalAggOutput, 
Optional.of(partitionExpressions),
@@ -1680,6 +1696,16 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext, 
localAggGroupBy);
         List<Expression> partitionExpressions = 
getHashAggregatePartitionExpressions(logicalAgg);
         RequireProperties requireAny = 
RequireProperties.of(PhysicalProperties.ANY);
+
+        boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && 
localAggOutput.isEmpty();
+
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            localAggGroupBy = ImmutableList.of(new 
NullLiteral(TinyIntType.INSTANCE));
+            localAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         PhysicalHashAggregate<Plan> anyLocalAgg = new 
PhysicalHashAggregate<>(localAggGroupBy,
                 localAggOutput, Optional.of(partitionExpressions), 
inputToBufferParam,
                 maybeUsingStreamAgg, Optional.empty(), 
logicalAgg.getLogicalProperties(),
@@ -1702,6 +1728,12 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 .addAll(nonDistinctAggFunctionToAliasPhase2.values())
                 .build();
 
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            globalAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         RequireProperties requireGather = 
RequireProperties.of(PhysicalProperties.GATHER);
 
         RequireProperties requireDistinctHash = RequireProperties.of(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to