This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch ckb_preview
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/ckb_preview by this push:
new f997850fd86 adjust single streaming threshold
f997850fd86 is described below
commit f997850fd8636a97018514c2537c5197c4b8bbcf
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Mar 5 16:34:33 2026 +0800
adjust single streaming threshold
update
---
.../distinct_streaming_aggregation_operator.cpp | 23 ++++++++++++++----
.../exec/distinct_streaming_aggregation_operator.h | 2 ++
.../exec/streaming_aggregation_operator.cpp | 27 +++++++++++++++++-----
.../pipeline/exec/streaming_aggregation_operator.h | 1 +
be/src/runtime/fragment_mgr.cpp | 3 +++
be/src/runtime/query_context.h | 8 +++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 2 ++
.../org/apache/doris/qe/CoordinatorContext.java | 5 ++++
.../doris/qe/runtime/ThriftPlansBuilder.java | 5 ++++
gensrc/thrift/PaloInternalService.thrift | 5 ++++
10 files changed, 71 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 1d39488b6c9..5727a9793e0 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -53,6 +53,17 @@ static constexpr StreamingHtMinReductionEntry
STREAMING_HT_MIN_REDUCTION[] = {
{.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
};
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
+};
+
static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
@@ -62,7 +73,8 @@
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
batch_size(state->batch_size()),
_agg_data(std::make_unique<DistinctDataVariants>()),
_child_block(vectorized::Block::create_unique()),
- _aggregated_block(vectorized::Block::create_unique()) {}
+ _aggregated_block(vectorized::Block::create_unique()),
+
_is_single_backend(state->get_query_ctx()->is_single_backend_query()) {}
Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -113,10 +125,14 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
return true;
}
+ const auto* reduction = _is_single_backend
+ ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
STREAMING_HT_MIN_REDUCTION;
+
// Find the appropriate reduction factor in our table
for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
++cache_level;
}
@@ -145,8 +161,7 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
// double estimated_reduction = aggregated_input_rows
>= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+ double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d4f7a08136b..eb062fe4638 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -87,6 +87,8 @@ private:
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
+
+ bool _is_single_backend = false;
};
class DistinctStreamingAggOperatorX final
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 86461a8460c..9d716afd31b 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -66,13 +66,24 @@ struct StreamingHtMinReductionEntry {
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
- {0, 0.0},
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
// At present, The L2 cache is generally 1024k or more
- {1024 * 1024, 1.1},
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
// Expand into main memory if we're getting a significant reduction.
// The L3 cache is generally 16MB or more
- {16 * 1024 * 1024, 2.0},
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
+};
+
+static constexpr StreamingHtMinReductionEntry
SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0},
};
static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
@@ -81,6 +92,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
_agg_data(std::make_unique<AggregatedDataVariants>()),
+
_is_single_backend(state->get_query_ctx()->is_single_backend_query()),
_child_block(vectorized::Block::create_unique()),
_pre_aggregated_block(vectorized::Block::create_unique()) {}
@@ -236,10 +248,14 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
return true;
}
+ const auto* reduction = _is_single_backend
+ ?
SINGLE_BE_STREAMING_HT_MIN_REDUCTION
+ :
STREAMING_HT_MIN_REDUCTION;
+
// Find the appropriate reduction factor in our table
for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 <
STREAMING_HT_MIN_REDUCTION_SIZE &&
- ht_mem >=
STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
+ ht_mem >= reduction[cache_level +
1].min_ht_mem) {
++cache_level;
}
@@ -268,8 +284,7 @@ bool
StreamingAggLocalState::_should_expand_preagg_hash_tables() {
// double estimated_reduction = aggregated_input_rows
>= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows /
aggregated_input_rows) * (current_reduction - 1);
- double min_reduction =
-
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+ double min_reduction =
reduction[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index c90d1ea8a5b..9846ca7b68f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -116,6 +116,7 @@ private:
std::vector<uint8_t> cmp_res;
std::vector<int> order_directions;
std::vector<int> null_directions;
+ bool _is_single_backend = false;
struct HeapLimitCursor {
HeapLimitCursor(int row_id, vectorized::MutableColumns& limit_columns,
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 49f7849187c..39f7b9d0bb7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -923,6 +923,9 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}
+ query_ctx->set_single_backend_query(params.__isset.query_options &&
+
params.query_options.__isset.single_backend_query &&
+
params.query_options.single_backend_query);
query_ctx->set_pipeline_context(params.fragment_id, context);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 5c77a427f59..7109901d35d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -109,6 +109,12 @@ public:
return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
}
+ bool is_single_backend_query() const { return _is_single_backend_query; }
+
+ void set_single_backend_query(bool is_single_backend_query) {
+ _is_single_backend_query = is_single_backend_query;
+ }
+
int64_t get_remaining_query_time_seconds() const {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
@@ -388,6 +394,8 @@ private:
std::string _load_error_url;
std::string _first_error_msg;
+ bool _is_single_backend_query = false;
+
// file cache context holders
std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr>
_query_context_holders;
// instance id + node id -> cte scan
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d0f002a3f15..f0705f8e195 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -835,6 +835,7 @@ public class Coordinator implements CoordInterface {
// TableValuedFunctionScanNode, we should ensure
TableValuedFunctionScanNode does not
// send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() > 1;
+ boolean isSingleBackend = addressToBackendID.size() == 1;
for (PlanFragment fragment : fragments) {
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());
@@ -871,6 +872,7 @@ public class Coordinator implements CoordInterface {
entry.getValue().setFragmentNumOnHost(hostCounter.count(pipelineExecContext.address));
entry.getValue().setBackendId(pipelineExecContext.backend.getId());
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
+
entry.getValue().getQueryOptions().setSingleBackendQuery(isSingleBackend);
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId),
pipelineExecContext);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index f3c825cf9d8..f1a124f487c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -103,6 +103,7 @@ public class CoordinatorContext {
public final Supplier<Set<TUniqueId>> instanceIds =
Suppliers.memoize(this::getInstanceIds);
public final Supplier<Map<TNetworkAddress, Long>> backends =
Suppliers.memoize(this::getBackends);
public final Supplier<Integer> scanRangeNum =
Suppliers.memoize(this::getScanRangeNum);
+ public final Supplier<Boolean> isSingleBackendQuery =
Suppliers.memoize(this::computeIsSingleBackendQuery);
public final Supplier<TNetworkAddress> directConnectFrontendAddress
= Suppliers.memoize(this::computeDirectConnectCoordinator);
@@ -447,6 +448,10 @@ public class CoordinatorContext {
return scanRangeNum;
}
+ private boolean computeIsSingleBackendQuery() {
+ return backends.get().size() == 1;
+ }
+
private int computeScanRangeNumByScanRange(TScanRangeParams param) {
int scanRangeNum = 0;
TScanRange scanRange = param.getScanRange();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 6ffa8de61a4..0738876282f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -106,6 +106,11 @@ public class ThriftPlansBuilder {
Set<Integer> fragmentToNotifyClose =
setParamsForRecursiveCteNode(distributedPlans,
coordinatorContext.runtimeFilters);
+ // Determine whether this query is assigned to a single backend and
propagate it to
+ // TQueryOptions so that BE can apply more appropriate optimization
strategies (e.g.
+ // streaming aggregation hash table thresholds).
+
coordinatorContext.queryOptions.setSingleBackendQuery(coordinatorContext.isSingleBackendQuery.get());
+
// we should set runtime predicate first, then we can use heap sort
and to thrift
setRuntimePredicateIfNeed(coordinatorContext.scanNodes);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..495c1477647 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,11 @@ struct TQueryOptions {
// Use paimon-cpp to read Paimon splits on BE
201: optional bool enable_paimon_cpp_reader = false;
+ // Whether all fragments of this query are assigned to a single backend.
+ // When true, the streaming aggregation operator can use more aggressive
+ // hash table expansion thresholds since all data is local.
+ 202: optional bool single_backend_query = false;
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]