This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch ckb_preview
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/ckb_preview by this push:
     new f997850fd86 adjust single streaming threshold
f997850fd86 is described below

commit f997850fd8636a97018514c2537c5197c4b8bbcf
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Mar 5 16:34:33 2026 +0800

    adjust single streaming threshold
    
    update
---
 .../distinct_streaming_aggregation_operator.cpp    | 23 ++++++++++++++----
 .../exec/distinct_streaming_aggregation_operator.h |  2 ++
 .../exec/streaming_aggregation_operator.cpp        | 27 +++++++++++++++++-----
 .../pipeline/exec/streaming_aggregation_operator.h |  1 +
 be/src/runtime/fragment_mgr.cpp                    |  3 +++
 be/src/runtime/query_context.h                     |  8 +++++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  2 ++
 .../org/apache/doris/qe/CoordinatorContext.java    |  5 ++++
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  5 ++++
 gensrc/thrift/PaloInternalService.thrift           |  5 ++++
 10 files changed, 71 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 1d39488b6c9..5727a9793e0 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry 
STREAMING_HT_MIN_REDUCTION[] = {
         {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
 };
 
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};
+
 static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
         sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
@@ -62,7 +73,8 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
           batch_size(state->batch_size()),
           _agg_data(std::make_unique<DistinctDataVariants>()),
           _child_block(vectorized::Block::create_unique()),
-          _aggregated_block(vectorized::Block::create_unique()) {}
+          _aggregated_block(vectorized::Block::create_unique()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
 
 Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -113,10 +125,14 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                             return true;
                         }
 
+                        const auto* reduction = _is_single_backend
+                                                        ? 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                        : 
STREAMING_HT_MIN_REDUCTION;
+
                         // Find the appropriate reduction factor in our table 
for the current hash table sizes.
                         int cache_level = 0;
                         while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+                               ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
                             ++cache_level;
                         }
 
@@ -145,8 +161,7 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
                         //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
                         //      ? current_reduction
                         //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+                        double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
 
                         //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
                         //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d4f7a08136b..eb062fe4638 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -87,6 +87,8 @@ private:
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
     RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+
+    bool _is_single_backend = false;
 };
 
 class DistinctStreamingAggOperatorX final
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 86461a8460c..9d716afd31b 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
 // of the machine that we're running on.
 static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
         // Expand up to L2 cache always.
-        {0, 0.0},
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
         // Expand into L3 cache if we look like we're getting some reduction.
         // At present, The L2 cache is generally 1024k or more
-        {1024 * 1024, 1.1},
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
         // Expand into main memory if we're getting a significant reduction.
         // The L3 cache is generally 16MB or more
-        {16 * 1024 * 1024, 2.0},
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
 };
 
 static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
@@ -81,6 +92,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : Base(state, parent),
           _agg_data(std::make_unique<AggregatedDataVariants>()),
+          
_is_single_backend(state->get_query_ctx()->is_single_backend_query()),
           _child_block(vectorized::Block::create_unique()),
           _pre_aggregated_block(vectorized::Block::create_unique()) {}
 
@@ -236,10 +248,14 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
                             return true;
                         }
 
+                        const auto* reduction = _is_single_backend
+                                                        ? 
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+                                                        : 
STREAMING_HT_MIN_REDUCTION;
+
                         // Find the appropriate reduction factor in our table 
for the current hash table sizes.
                         int cache_level = 0;
                         while (cache_level + 1 < 
STREAMING_HT_MIN_REDUCTION_SIZE &&
-                               ht_mem >= 
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+                               ht_mem >= reduction[cache_level + 
1].min_ht_mem) {
                             ++cache_level;
                         }
 
@@ -268,8 +284,7 @@ bool 
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
                         //  double estimated_reduction = aggregated_input_rows 
>= expected_input_rows
                         //      ? current_reduction
                         //      : 1 + (expected_input_rows / 
aggregated_input_rows) * (current_reduction - 1);
-                        double min_reduction =
-                                
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+                        double min_reduction = 
reduction[cache_level].streaming_ht_min_reduction;
 
                         //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
                         //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index c90d1ea8a5b..9846ca7b68f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -116,6 +116,7 @@ private:
     std::vector<uint8_t> cmp_res;
     std::vector<int> order_directions;
     std::vector<int> null_directions;
+    bool _is_single_backend = false;
 
     struct HeapLimitCursor {
         HeapLimitCursor(int row_id, vectorized::MutableColumns& limit_columns,
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 49f7849187c..39f7b9d0bb7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -923,6 +923,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
         query_ctx->set_ready_to_execute_only();
     }
+    query_ctx->set_single_backend_query(params.__isset.query_options &&
+                                        
params.query_options.__isset.single_backend_query &&
+                                        
params.query_options.single_backend_query);
 
     query_ctx->set_pipeline_context(params.fragment_id, context);
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 5c77a427f59..7109901d35d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -109,6 +109,12 @@ public:
         return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
     }
 
+    bool is_single_backend_query() const { return _is_single_backend_query; }
+
+    void set_single_backend_query(bool is_single_backend_query) {
+        _is_single_backend_query = is_single_backend_query;
+    }
+
     int64_t get_remaining_query_time_seconds() const {
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
@@ -388,6 +394,8 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    bool _is_single_backend_query = false;
+
     // file cache context holders
     std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> 
_query_context_holders;
     // instance id + node id -> cte scan
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d0f002a3f15..f0705f8e195 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -835,6 +835,7 @@ public class Coordinator implements CoordInterface {
             // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does not
             // send data until ExchangeNode is ready to receive.
             boolean twoPhaseExecution = fragments.size() > 1;
+            boolean isSingleBackend = addressToBackendID.size() == 1;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
@@ -871,6 +872,7 @@ public class Coordinator implements CoordInterface {
                     
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
                     
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
                     
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+                    
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
                     
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
 
                     
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), 
pipelineExecContext);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index f3c825cf9d8..f1a124f487c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
     public final Supplier<Set<TUniqueId>> instanceIds = 
Suppliers.memoize(this::getInstanceIds);
     public final Supplier<Map<TNetworkAddress, Long>> backends = 
Suppliers.memoize(this::getBackends);
     public final Supplier<Integer> scanRangeNum = 
Suppliers.memoize(this::getScanRangeNum);
+    public final Supplier<Boolean> isSingleBackendQuery = 
Suppliers.memoize(this::computeIsSingleBackendQuery);
     public final Supplier<TNetworkAddress> directConnectFrontendAddress
             = Suppliers.memoize(this::computeDirectConnectCoordinator);
 
@@ -447,6 +448,10 @@ public class CoordinatorContext {
         return scanRangeNum;
     }
 
+    private boolean computeIsSingleBackendQuery() {
+        return backends.get().size() == 1;
+    }
+
     private int computeScanRangeNumByScanRange(TScanRangeParams param) {
         int scanRangeNum = 0;
         TScanRange scanRange = param.getScanRange();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 6ffa8de61a4..0738876282f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -106,6 +106,11 @@ public class ThriftPlansBuilder {
         Set<Integer> fragmentToNotifyClose = 
setParamsForRecursiveCteNode(distributedPlans,
                 coordinatorContext.runtimeFilters);
 
+        // Determine whether this query is assigned to a single backend and 
propagate it to
+        // TQueryOptions so that BE can apply more appropriate optimization 
strategies (e.g.
+        // streaming aggregation hash table thresholds).
+        
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
         // we should set runtime predicate first, then we can use heap sort 
and to thrift
         setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..495c1477647 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,11 @@ struct TQueryOptions {
   // Use paimon-cpp to read Paimon splits on BE
   201: optional bool enable_paimon_cpp_reader = false;
 
+  // Whether all fragments of this query are assigned to a single backend.
+  // When true, the streaming aggregation operator can use more aggressive
+  // hash table expansion thresholds since all data is local.
+  202: optional bool single_backend_query = false;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to