This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b2de83f250f2a4363f691c5b7a091b359a2c003d
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Mar 7 23:00:29 2024 +0800

    [agg](conf) Add a knob to control distinct agg (#31930)
    
    Add a knob to control distinct agg
---
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp        | 4 +++-
 fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | 5 +++++
 gensrc/thrift/PaloInternalService.thrift                          | 2 ++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 01dc49d3667..1a7f141168f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -971,7 +971,9 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
-        if (tnode.agg_node.aggregate_functions.empty()) {
+        if (tnode.agg_node.aggregate_functions.empty() &&
+            
request.query_options.__isset.enable_distinct_streaming_aggregation &&
+            request.query_options.enable_distinct_streaming_aggregation) {
             op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 164e06e0856..f36e831a5c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -116,6 +116,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final int MIN_EXEC_MEM_LIMIT = 2097152;
     public static final String BATCH_SIZE = "batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = 
"disable_streaming_preaggregations";
+    public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = 
"enable_distinct_streaming_aggregation";
     public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
     public static final String ENABLE_BUCKET_SHUFFLE_JOIN = 
"enable_bucket_shuffle_join";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = 
"parallel_fragment_exec_instance_num";
@@ -736,6 +737,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DISABLE_STREAMING_PREAGGREGATIONS, fuzzy = 
true)
     public boolean disableStreamPreaggregations = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGGREGATION, fuzzy = 
true)
+    public boolean enableDistinctStreamingAggregation = true;
+
     @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN)
     public boolean disableColocatePlan = false;
 
@@ -2889,6 +2893,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setBatchSize(batchSize);
         tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
+        
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
 
         if (maxScanKeyNum > -1) {
             tResult.setMaxScanKeyNum(maxScanKeyNum);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 64024bed2aa..e846b32bff5 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -274,6 +274,8 @@ struct TQueryOptions {
   // Increase concurrency of scanners adaptively, the maxinum times to scale up
   99: optional double scanner_scale_up_ratio = 0;
 
+  100: optional bool enable_distinct_streaming_aggregation = true;
+
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to