This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 92f75f3d356 [feature](memory) Global mem control on scan nodes (#61271)
92f75f3d356 is described below
commit 92f75f3d3568f3031ed8de0bd6b3c09a7ea1cfb0
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 1 14:12:27 2026 +0800
[feature](memory) Global mem control on scan nodes (#61271)
π Overview
This commit introduces a global memory control system for scan nodes in
Apache Doris, enabling adaptive scanner concurrency management based on
available memory. The system prevents OOM (Out Of Memory) errors by
dynamically
adjusting scanner parallelism according to query-level memory
constraints.
---
ποΈ Architecture: Three-Layer Memory Control System
Layer 1: Query Level - ScannerMemShareArbitrator
Purpose: Fair memory distribution across all scan operators in a query
QueryContext (owns)
βββ ScannerMemShareArbitrator
βββ query_mem_limit: Total query memory (e.g., 100GB)
βββ scan_mem_limit: Memory allocated for all scans (query_mem_limit Γ
max_scan_mem_ratio)
βββ total_scanner_mem_bytes: Atomic counter tracking total memory usage
Key Responsibilities:
- Distributes scan_mem_limit proportionally across multiple scan nodes
- Tracks global scanner memory usage atomically
- Implements proportional sharing algorithm
Layer 2: Scan Node Level - ScannerMemLimiter
Purpose: Control scanner concurrency for a specific scan node across all
instances
ScanOperatorX (owns)
βββ ScannerMemLimiter
βββ parallelism: Number of parallel instances
βββ scan_mem_limit: Memory allocated by arbitrator (updated dynamically)
βββ estimated_block_mem_bytes: Average memory per block (running
average)
βββ running_scanner_count: Active scanners across all instances
Key Responsibilities:
- Calculates available scanner count based on memory
- Maintains running average of block memory usage
- Distributes scanners fairly across parallel instances
Layer 3: Instance Level - ScannerContext
Purpose: Manages scanners for a single scan operator instance
ScannerContext (per instance)
βββ References: _mem_share_arb, _scanner_mem_limiter
βββ _ins_idx: Instance index (0, 1, 2, ...)
βββ _enable_adaptive_scanners: Feature flag
βββ _adaptive_processor: Adaptive scheduling logic
---
π How It Works: Memory Flow in a Query
Phase 1: Initialization (Query Start)
1. QueryContext created
βββ Creates ScannerMemShareArbitrator
βββ query_mem_limit = 100GB (from session variable)
βββ max_scan_mem_ratio = 0.3 (30% of query memory for scans)
βββ scan_mem_limit = 30GB
2. Each ScanOperatorX (Scan Node) initialized
βββ Registers with arbitrator
βββ arbitrator->register_scan_node()
βββ total_scanner_mem_bytes += 64MB (DEFAULT_SCANNER_MEM_BYTES)
βββ Creates ScannerMemLimiter
βββ parallelism = 4 (query has 4 parallel instances)
βββ serial_scan = false
βββ query_scan_mem_limit = 30GB
3. Each ScannerContext (Instance) initialized
βββ ScannerContext::init()
βββ update_open_scanner_context_count(1)
βββ reestimated_block_mem_bytes(64MB) // Initial estimate
βββ _adjust_scan_mem_limit(64MB, 64MB) // Register with arbitrator
---
Phase 2: Runtime Adaptation (During Execution)
Memory Re-estimation Loop
// When a block is scanned
ScannerContext::reestimated_block_mem_bytes(actual_block_size)
ββ> ScannerMemLimiter::reestimated_block_mem_bytes(actual_block_size)
ββ> Running average calculation:
new_estimate = (old_estimate Γ count + actual_size) / (count + 1)
// Example: 64MB β 100MB β avg = 82MB
Memory Limit Adjustment
ScannerContext::_adjust_scan_mem_limit(old_mem, new_mem)
ββ> ScannerMemShareArbitrator::update_scanner_mem_bytes(old_mem,
new_mem)
βββ Update total: total_scanner_mem_bytes += (new_mem - old_mem)
βββ Calculate ratio: ratio = new_mem / total
βββ Return proportional share: scan_mem_limit Γ ratio
ββ> ScannerMemLimiter::update_scan_mem_limit(new_limit)
βββ scan_mem_limit = new_limit (atomic update)
Scanner Count Calculation
ScannerContext::_available_pickup_scanner_count()
ββ> ScannerMemLimiter::available_scanner_count(ins_idx)
βββ max_count = scan_mem_limit / estimated_block_mem_bytes
β // Example: 10GB / 100MB = 100 scanners total
β
βββ per_count = max_count / parallelism
β // Example: 100 / 4 = 25 scanners per instance
β
βββ Distribution logic:
if (serial_scan)
instance[0] gets all (max_count)
else
instances distribute evenly with remainder to first instances
// Instance 0,1,2,3 each gets 25 scanners
---
π Real-World Example
Scenario: Query with 2 scan nodes, 4 parallel instances
Query Memory: 100GB
max_scan_mem_ratio: 0.3
Scan Memory Budget: 30GB
Initial State
ScannerMemShareArbitrator:
scan_mem_limit = 30GB
total_scanner_mem_bytes = 128MB (2 nodes Γ 64MB)
ScanNode1_Limiter:
estimated_block_mem_bytes = 64MB
ScanNode2_Limiter:
estimated_block_mem_bytes = 64MB
After ScanNode1 scans some data
ScanNode1_Context[0]::reestimated_block_mem_bytes(200MB)
ββ> ScanNode1_Limiter::estimated_block_mem_bytes = 200MB (updated)
ββ> _adjust_scan_mem_limit(64MB, 200MB)
ββ> Arbitrator updates:
total = 128MB - 64MB + 200MB = 264MB
ratio = 200MB / 264MB = 75.8%
ScanNode1_limit = 30GB Γ 75.8% = 22.7GB
ScanNode1_Limiter::available_scanner_count(0)
max_count = 22.7GB / 200MB = 116 scanners
per_count = 116 / 4 = 29 scanners per instance
ScanNode2 gets remaining memory
ScanNode2_Context[0]::_adjust_scan_mem_limit(64MB, 64MB)
ββ> Arbitrator updates:
ratio = 64MB / 264MB = 24.2%
ScanNode2_limit = 30GB Γ 24.2% = 7.3GB
ScanNode2_Limiter::available_scanner_count(0)
max_count = 7.3GB / 64MB = 117 scanners
per_count = 117 / 4 = 29 scanners per instance
Result: Memory dynamically allocated based on actual usage!
---
π― Key Benefits
1. Prevents OOM
- Limits total scan memory to query_mem_limit Γ max_scan_mem_ratio
- Dynamically adjusts scanner concurrency to stay within limits
2. Fair Resource Sharing
- Multiple scan nodes share memory proportionally
- Nodes using more memory get proportionally more budget
3. Adaptive to Workload
- Small blocks β More scanners allowed
- Large blocks β Fewer scanners to prevent OOM
4. Instance-Level Fairness
- Scanners distributed evenly across parallel instances
- Serial mode gives all scanners to first instance
---
π§ Configuration
Session Variables (FE)
-- Enable adaptive scanning (default: varies)
SET enable_adaptive_scan = true;
-- Max ratio of query memory for scans (default: 0.3 = 30%)
SET max_scan_mem_ratio = 0.3;
-- Total query memory limit
SET exec_mem_limit = 107374182400; -- 100GB
BE Configuration
// Minimum interval between scanner count adjustments
config::doris_scanner_dynamic_interval_ms = 100; // 100ms
---
π Algorithm Deep Dive
Proportional Sharing Algorithm
int64_t ScannerMemShareArbitrator::update_scanner_mem_bytes(old_value,
new_value) {
// Atomic update of total
int64_t diff = new_value - old_value;
int64_t total = total_scanner_mem_bytes.fetch_add(diff) + diff;
// Special cases
if (new_value == 0) return 0; // Context closing
if (total <= 0) return scan_mem_limit; // Only context active
// Proportional allocation
double ratio = new_value / max(total, new_value);
return scan_mem_limit Γ ratio;
}
Example Calculation:
- ScanNode1 uses 200MB, ScanNode2 uses 64MB β total = 264MB
- ScanNode1 ratio = 200/264 = 75.8% β gets 22.7GB
- ScanNode2 ratio = 64/264 = 24.2% β gets 7.3GB
- Total: 22.7GB + 7.3GB = 30GB β
Scanner Distribution Algorithm
int ScannerMemLimiter::available_scanner_count(int ins_idx) const {
int64_t max_count = scan_mem_limit / estimated_block_mem_bytes;
int64_t per_count = max_count / parallelism;
if (serial_scan) {
// First instance gets all scanners + remainder
per_count += (max_count - per_count Γ parallelism);
} else if (ins_idx < max_count - per_count Γ parallelism) {
// Distribute remainder to first instances
per_count += 1;
}
return per_count;
}
Example (100 scanners, 4 instances):
- Base: 100 / 4 = 25 per instance
- Remainder: 100 % 4 = 0
- Distribution: [25, 25, 25, 25]
Example (103 scanners, 4 instances):
- Base: 103 / 4 = 25 per instance
- Remainder: 103 % 4 = 3
- Distribution: [26, 26, 26, 25] (first 3 get +1)
---
π Lifecycle Management
Context Opening
ScannerContext::init() {
if (enable_adaptive_scanners) {
int64_t c = limiter->update_open_scanner_context_count(1);
limiter->reestimated_block_mem_bytes(64MB);
if (c == 0) { // First context for this scan node
_adjust_scan_mem_limit(64MB, 64MB); // Register with arbitrator
}
}
}
Context Closing
ScannerContext::~ScannerContext() {
if (enable_adaptive_scanners) {
if (limiter->update_open_scanner_context_count(-1) == 1) {
// Last context closing
_adjust_scan_mem_limit(current_mem, 0); // Release to arbitrator
}
}
}
---
π Future Enhancements (TODOs in Code)
The commit includes commented-out code for future performance-based
adaptation:
1. IO Latency Monitoring: Adjust scanners based on disk/network speed
2. Source Speed Tracking: Monitor block production rate
3. Adaptive Speedup: Incrementally test if adding scanners improves
throughput
4. Slow IO Detection: Reduce scanners when IO becomes bottleneck
---
This comprehensive memory control system ensures Doris can handle
large-scale scans efficiently while preventing memory exhaustion, making
the system more robust and predictable under heavy workloads! π
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 4 +-
be/src/exec/common/memory.cpp | 90 ++++
be/src/exec/common/memory.h | 116 +++++
be/src/exec/operator/scan_operator.cpp | 27 +-
be/src/exec/operator/scan_operator.h | 4 +
be/src/exec/pipeline/pipeline_fragment_context.cpp | 2 -
be/src/exec/scan/scanner.cpp | 2 +-
be/src/exec/scan/scanner.h | 38 +-
be/src/exec/scan/scanner_context.cpp | 275 +++++++----
be/src/exec/scan/scanner_context.h | 139 +++++-
be/src/exec/scan/scanner_scheduler.cpp | 210 ++++----
be/src/runtime/query_context.cpp | 3 +
be/src/runtime/query_context.h | 3 +
be/test/exec/scan/scanner_context_test.cpp | 530 +++++++++++++++++++--
.../java/org/apache/doris/qe/SessionVariable.java | 8 +
gensrc/thrift/PaloInternalService.thrift | 4 +-
.../data/mv_p0/test_casewhen/test_casewhen.out | 2 +-
.../publish/test_partial_update_publish_seq.out | 38 +-
.../test_p_seq_publish_read_from_old.out | 10 +-
.../mv_p0/test_casewhen/test_casewhen.groovy | 2 +-
.../publish/test_partial_update_publish_seq.groovy | 8 +-
.../test_p_seq_publish_read_from_old.groovy | 4 +-
23 files changed, 1195 insertions(+), 325 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 269e2d2e0cc..5227059c066 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -345,6 +345,7 @@ DEFINE_mInt32(doris_scanner_row_num, "16384");
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
+DEFINE_mInt32(doris_scanner_dynamic_interval_ms, "100");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 069483c4fb8..4a7c7a3634d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -402,8 +402,10 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
-// single read execute fragment max run time millseconds
+// Deprecated. single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
+// Minimum interval in milliseconds between adaptive scanner concurrency
adjustments
+DECLARE_mInt32(doris_scanner_dynamic_interval_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
diff --git a/be/src/exec/common/memory.cpp b/be/src/exec/common/memory.cpp
new file mode 100644
index 00000000000..4ca4af037b3
--- /dev/null
+++ b/be/src/exec/common/memory.cpp
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/formatIPv6.cpp
+// and modified by Doris
+
+#include "exec/common/memory.h"
+
+namespace doris {
+
+MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t
query_mem_limit,
+ double max_scan_ratio)
+ : query_id(qid),
+ query_mem_limit(query_mem_limit),
+ mem_limit(std::max<int64_t>(
+ 1, static_cast<int64_t>(static_cast<double>(query_mem_limit)
* max_scan_ratio))) {
+}
+
+void MemShareArbitrator::register_scan_node() {
+ total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES);
+}
+
+int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t
new_value) {
+ int64_t diff = new_value - old_value;
+ int64_t total = total_mem_bytes.fetch_add(diff) + diff;
+ if (new_value == 0) return 0;
+ if (total <= 0) return mem_limit;
+ // Proportional sharing: allocate based on this context's share of total
usage
+ double ratio = static_cast<double>(new_value) /
static_cast<double>(std::max(total, new_value));
+ return static_cast<int64_t>(static_cast<double>(mem_limit) * ratio);
+}
+
+// ==================== MemLimiter ====================
+int MemLimiter::available_scanner_count(int ins_idx) const {
+ int64_t mem_limit_value = mem_limit.load();
+ int64_t running_tasks_count_value = running_tasks_count.load();
+ int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes();
+ DCHECK_GT(estimated_block_mem_bytes_value, 0);
+
+ int64_t max_count = std::max(1L, mem_limit_value /
estimated_block_mem_bytes_value);
+ int64_t avail_count = max_count;
+ int64_t per_count = avail_count / parallelism;
+ if (serial_operator) {
+ per_count += (avail_count - per_count * parallelism);
+ } else if (ins_idx < avail_count - per_count * parallelism) {
+ per_count += 1;
+ }
+
+ VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "("
+ << running_tasks_count_value << "/" <<
estimated_block_mem_bytes_value
+ << "), operator_mem_limit = " << operator_mem_limit
+ << ", running_tasks_count = " << running_tasks_count_value
+ << ", parallelism = " << parallelism << ", avail_count = " <<
avail_count
+ << ", ins_id = " << ins_idx << ", per_count = " << per_count
+ << " debug_string: " << debug_string();
+
+ return cast_set<int>(per_count);
+}
+
+void MemLimiter::reestimated_block_mem_bytes(int64_t value) {
+ if (value == 0) return;
+ value = std::min(value, operator_mem_limit);
+
+ std::lock_guard<std::mutex> L(lock);
+ auto old_value = estimated_block_mem_bytes.load();
+ int64_t total =
+ get_estimated_block_mem_bytes() *
estimated_block_mem_bytes_update_count + value;
+ estimated_block_mem_bytes_update_count += 1;
+ estimated_block_mem_bytes = total / estimated_block_mem_bytes_update_count;
+ VLOG_DEBUG << fmt::format(
+ "reestimated_block_mem_bytes. MemLimiter = {},
estimated_block_mem_bytes = {}, "
+ "old_value = {}, value: {}",
+ debug_string(), estimated_block_mem_bytes, old_value, value);
+}
+
+} // namespace doris
diff --git a/be/src/exec/common/memory.h b/be/src/exec/common/memory.h
new file mode 100644
index 00000000000..de9a0aceaa5
--- /dev/null
+++ b/be/src/exec/common/memory.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <fmt/format.h>
+#include <gen_cpp/Types_types.h>
+
+#include <atomic>
+#include <string>
+
+#include "common/cast_set.h"
+#include "common/factory_creator.h"
+#include "common/logging.h"
+#include "util/uid_util.h"
+
+namespace doris {
+static constexpr int64_t DEFAULT_SCANNER_MEM_BYTES = 64 * 1024 * 1024; // 64MB
default
+
+// Query-level memory arbitrator that distributes memory fairly across all
scan contexts
+struct MemShareArbitrator {
+ ENABLE_FACTORY_CREATOR(MemShareArbitrator)
+ TUniqueId query_id;
+ int64_t query_mem_limit = 0;
+ int64_t mem_limit = 0;
+ std::atomic<int64_t> total_mem_bytes = 0;
+
+ MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double
max_scan_ratio);
+
+ // Update memory allocation when scanner memory usage changes
+ // Returns new scan memory limit for this context
+ int64_t update_mem_bytes(int64_t old_value, int64_t new_value);
+ void register_scan_node();
+ std::string debug_string() const {
+ return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}",
print_id(query_id),
+ query_mem_limit, mem_limit);
+ }
+};
+
+// Scan-context-level memory limiter that controls scanner concurrency based
on memory
+struct MemLimiter {
+private:
+ TUniqueId query_id;
+ mutable std::mutex lock;
+ // Parallelism of the scan operator
+ const int64_t parallelism = 0;
+ const bool serial_operator = false;
+ const int64_t operator_mem_limit;
+ std::atomic<int64_t> running_tasks_count = 0;
+
+ std::atomic<int64_t> estimated_block_mem_bytes = 0;
+ int64_t estimated_block_mem_bytes_update_count = 0;
+ int64_t arb_mem_bytes = 0;
+ std::atomic<int64_t> open_tasks_count = 0;
+
+ // Memory limit for this scan node (shared by all instances), updated by
memory share arbitrator
+ std::atomic<int64_t> mem_limit = 0;
+
+public:
+ ENABLE_FACTORY_CREATOR(MemLimiter)
+ MemLimiter(const TUniqueId& qid, int64_t parallelism, bool
serial_operator_, int64_t mem_limit)
+ : query_id(qid),
+ parallelism(parallelism),
+ serial_operator(serial_operator_),
+ operator_mem_limit(mem_limit) {}
+ ~MemLimiter() { DCHECK_LE(open_tasks_count, 0); }
+
+ // Calculate available scanner count based on memory limit
+ int available_scanner_count(int ins_idx) const;
+
+ int64_t update_running_tasks_count(int delta) { return running_tasks_count
+= delta; }
+
+ // Re-estimated the average memory usage of a block, and update the
estimated_block_mem_bytes accordingly.
+ void reestimated_block_mem_bytes(int64_t value);
+ void update_mem_limit(int64_t value) { mem_limit = value; }
+ // Update the memory usage of this context to memory share arbitrator.
+ // NOTE: This could be accessed by parallel tasks without synchronization,
but it's fine
+ // since the memory share arbitrator will do proportional sharing based on
the ratio of this
+ // context's memory usage to total memory usage, so even if there are some
fluctuations in the
+ // memory usage, the overall proportional sharing will still work.
+ void update_arb_mem_bytes(int64_t value) {
+ value = std::min(value, operator_mem_limit);
+ arb_mem_bytes = value;
+ }
+ int64_t get_arb_scanner_mem_bytes() const { return arb_mem_bytes; }
+
+ int64_t get_estimated_block_mem_bytes() const { return
estimated_block_mem_bytes; }
+
+ int64_t update_open_tasks_count(int delta) { return
open_tasks_count.fetch_add(delta); }
+ std::string debug_string() const {
+ return fmt::format(
+ "query_id: {}, parallelism: {}, serial_operator: {},
operator_mem_limit: {}, "
+ "running_tasks_count: {}, estimated_block_mem_bytes: {}, "
+ "estimated_block_mem_bytes_update_count: {}, arb_mem_bytes:
{}, "
+ "open_tasks_count: {}, mem_limit: {}",
+ print_id(query_id), parallelism, serial_operator,
operator_mem_limit,
+ running_tasks_count.load(), estimated_block_mem_bytes.load(),
+ estimated_block_mem_bytes_update_count, arb_mem_bytes,
open_tasks_count, mem_limit);
+ }
+};
+
+} // namespace doris
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 1c0623c8ae0..251e4e437ca 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -149,6 +149,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
set_scan_ranges(state, info.scan_ranges);
_wait_for_rf_timer = ADD_TIMER(common_profile(), "WaitForRuntimeFilter");
+ _instance_idx = info.task_idx;
return Status::OK();
}
@@ -1008,14 +1009,16 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
- _scanner_ctx.store(ScannerContext::create_shared(state(), this,
p._output_tuple_desc,
-
p.output_row_descriptor(), scanners, p.limit(),
- _scan_dependency,
&p._shared_scan_limit
+ _scanner_ctx.store(ScannerContext::create_shared(
+ state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
+ _scan_dependency, &p._shared_scan_limit, p._mem_arb,
p._mem_limiter, _instance_idx,
+
_state->get_query_ctx()->get_query_options().__isset.enable_adaptive_scan &&
+
_state->get_query_ctx()->get_query_options().enable_adaptive_scan
#ifdef BE_TEST
- ,
-
max_scanners_concurrency(state())
+ ,
+ max_scanners_concurrency(state())
#endif
- ));
+ ));
return Status::OK();
}
@@ -1182,6 +1185,18 @@ Status ScanOperatorX<LocalStateType>::init(const
TPlanNode& tnode, RuntimeState*
if (query_options.__isset.max_pushdown_conditions_per_column) {
_max_pushdown_conditions_per_column =
query_options.max_pushdown_conditions_per_column;
}
+#ifdef BE_TEST
+ _mem_arb = nullptr;
+#else
+ _mem_arb = state->get_query_ctx()->mem_arb();
+#endif
+ if (_mem_arb) {
+ _mem_arb->register_scan_node();
+ _mem_limiter =
+ MemLimiter::create_shared(state->query_id(),
state->query_parallel_instance_num(),
+
OperatorX<LocalStateType>::is_serial_operator(),
+
state->get_query_ctx()->get_query_options().mem_limit);
+ }
// tnode.olap_scan_node.push_down_agg_type_opt field is deprecated
// Introduced a new field : tnode.push_down_agg_type_opt
//
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index f9b162be62d..36b3e4c5a0d 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -336,6 +336,7 @@ protected:
// ScanLocalState owns the ownership of scanner, scanner context only has
its weakptr
std::list<std::shared_ptr<ScannerDelegate>> _scanners;
Arena _arena;
+ int _instance_idx = 0;
};
template <typename LocalStateType>
@@ -437,6 +438,9 @@ protected:
const int _parallel_tasks = 0;
std::vector<int> _topn_filter_source_node_ids;
+
+ std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
+ std::shared_ptr<MemLimiter> _mem_limiter = nullptr;
};
#include "common/compile_check_end.h"
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 40177f59aca..2386f6e3f8d 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -119,11 +119,9 @@
#include "load/stream_load/new_load_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
-#include "runtime/result_block_buffer.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
-#include "service/backend_options.h"
#include "util/countdown_latch.h"
#include "util/debug_util.h"
#include "util/uid_util.h"
diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp
index 7d7b4caeff9..d60ed9b5017 100644
--- a/be/src/exec/scan/scanner.cpp
+++ b/be/src/exec/scan/scanner.cpp
@@ -266,7 +266,7 @@ void Scanner::_collect_profile_before_close() {
_state->update_num_rows_load_unselected(_counter.num_rows_unselected);
}
-void Scanner::update_scan_cpu_timer() {
+void Scanner::_update_scan_cpu_timer() {
int64_t cpu_time = _cpu_watch.elapsed_time();
_scan_cpu_timer += cpu_time;
if (_state && _state->get_query_ctx()) {
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index 041af638c52..037a3d7936c 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -127,33 +127,41 @@ protected:
Status _do_projections(Block* origin_block, Block* output_block);
-public:
- int64_t get_time_cost_ns() const { return _per_scanner_timer; }
-
- int64_t projection_time() const { return _projection_timer; }
- int64_t get_rows_read() const { return _num_rows_read; }
-
- bool has_prepared() const { return _has_prepared; }
-
- Status try_append_late_arrival_runtime_filter();
-
+private:
// Call start_wait_worker_timer() when submit the scanner to the thread
pool.
// And call update_wait_worker_timer() when it is actually being executed.
- void start_wait_worker_timer() {
+ void _start_wait_worker_timer() {
_watch.reset();
_watch.start();
}
- void start_scan_cpu_timer() {
+ void _start_scan_cpu_timer() {
_cpu_watch.reset();
_cpu_watch.start();
}
- void update_wait_worker_timer() { _scanner_wait_worker_timer +=
_watch.elapsed_time(); }
+ void _update_wait_worker_timer() { _scanner_wait_worker_timer +=
_watch.elapsed_time(); }
+ void _update_scan_cpu_timer();
- int64_t get_scanner_wait_worker_timer() const { return
_scanner_wait_worker_timer; }
+public:
+ void resume() {
+ _update_wait_worker_timer();
+ _start_scan_cpu_timer();
+ }
+ void pause() {
+ _update_scan_cpu_timer();
+ _start_wait_worker_timer();
+ }
+ int64_t get_time_cost_ns() const { return _per_scanner_timer; }
+
+ int64_t projection_time() const { return _projection_timer; }
+ int64_t get_rows_read() const { return _num_rows_read; }
- void update_scan_cpu_timer();
+ bool has_prepared() const { return _has_prepared; }
+
+ Status try_append_late_arrival_runtime_filter();
+
+ int64_t get_scanner_wait_worker_timer() const { return
_scanner_wait_worker_timer; }
// Some counters need to be updated realtime, for example, workload group
policy need
// scan bytes to cancel the query exceed limit.
diff --git a/be/src/exec/scan/scanner_context.cpp
b/be/src/exec/scan/scanner_context.cpp
index f35c6e1732a..f758d0379b8 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -52,12 +52,17 @@ namespace doris {
using namespace std::chrono_literals;
#include "common/compile_check_begin.h"
+
+// ==================== ScannerContext ====================
ScannerContext::ScannerContext(RuntimeState* state, ScanLocalStateBase*
local_state,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<Dependency>
dependency,
- std::atomic<int64_t>* shared_scan_limit
+ std::atomic<int64_t>* shared_scan_limit,
+ std::shared_ptr<MemShareArbitrator> arb,
+ std::shared_ptr<MemLimiter> limiter, int
ins_idx,
+ bool enable_adaptive_scan
#ifdef BE_TEST
,
int num_parallel_instances
@@ -85,7 +90,11 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
_min_scan_concurrency_of_scan_scheduler(0),
_max_scan_concurrency(num_parallel_instances),
#endif
- _min_scan_concurrency(local_state->min_scanners_concurrency(state)) {
+ _min_scan_concurrency(local_state->min_scanners_concurrency(state)),
+ _scanner_mem_limiter(limiter),
+ _mem_share_arb(arb),
+ _ins_idx(ins_idx),
+ _enable_adaptive_scanners(enable_adaptive_scan) {
DCHECK(_state != nullptr);
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
@@ -93,12 +102,14 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
_resource_ctx = _state->get_query_ctx()->resource_ctx();
ctx_id = UniqueId::gen_uid().to_string();
for (auto& scanner : _all_scanners) {
- _pending_scanners.push(std::make_shared<ScanTask>(scanner));
- };
+ _pending_tasks.push(std::make_shared<ScanTask>(scanner));
+ }
if (limit < 0) {
limit = -1;
}
_dependency = dependency;
+ // Initialize adaptive processor
+ _adaptive_processor = ScannerAdaptiveProcessor::create_shared();
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
@@ -123,6 +134,57 @@ int64_t ScannerContext::acquire_limit_quota(int64_t
desired) {
}
}
+void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t
new_value) {
+ if (!_enable_adaptive_scanners) {
+ return;
+ }
+
+ int64_t new_scan_mem_limit = _mem_share_arb->update_mem_bytes(old_value,
new_value);
+ _scanner_mem_limiter->update_mem_limit(new_scan_mem_limit);
+ _scanner_mem_limiter->update_arb_mem_bytes(new_value);
+
+ VLOG_DEBUG << fmt::format(
+ "adjust_scan_mem_limit. context = {}, new mem scan limit = {},
scanner mem bytes = {} "
+ "-> {}",
+ debug_string(), new_scan_mem_limit, old_value, new_value);
+}
+
+int ScannerContext::_available_pickup_scanner_count() {
+ if (!_enable_adaptive_scanners) {
+ return _max_scan_concurrency;
+ }
+
+ int min_scanners = std::max(1, _min_scan_concurrency);
+ int max_scanners = _scanner_mem_limiter->available_scanner_count(_ins_idx);
+ max_scanners = std::min(max_scanners, _max_scan_concurrency);
+ min_scanners = std::min(min_scanners, max_scanners);
+ if (_ins_idx == 0) {
+ // Adjust memory limit via memory share arbitrator
+
_adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(),
+
_scanner_mem_limiter->get_estimated_block_mem_bytes());
+ }
+
+ ScannerAdaptiveProcessor& P = *_adaptive_processor;
+ int& scanners = P.expected_scanners;
+ int64_t now = UnixMillis();
+ // Avoid frequent adjustment - only adjust every 100ms
+ if (now - P.adjust_scanners_last_timestamp <=
config::doris_scanner_dynamic_interval_ms) {
+ return scanners;
+ }
+ P.adjust_scanners_last_timestamp = now;
+ auto old_scanners = P.expected_scanners;
+
+ scanners = std::max(min_scanners, scanners);
+ scanners = std::min(max_scanners, scanners);
+ VLOG_DEBUG << fmt::format(
+ "_available_pickup_scanner_count. context = {}, old_scanners = {},
scanners = {} "
+ ", min_scanners: {}, max_scanners: {}",
+ debug_string(), old_scanners, scanners, min_scanners,
max_scanners);
+
+ // TODO(gabriel): Scanners are scheduled adaptively based on the memory
usage now.
+ return scanners;
+}
+
// After init function call, should not access _parent
Status ScannerContext::init() {
#ifndef BE_TEST
@@ -168,6 +230,20 @@ Status ScannerContext::init() {
_set_scanner_done();
}
+ // Initialize memory limiter if memory-aware scheduling is enabled
+ if (_enable_adaptive_scanners) {
+ DCHECK(_scanner_mem_limiter && _mem_share_arb);
+ int64_t c = _scanner_mem_limiter->update_open_tasks_count(1);
+ // TODO(gabriel): set estimated block size
+
_scanner_mem_limiter->reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
+ _scanner_mem_limiter->update_arb_mem_bytes(DEFAULT_SCANNER_MEM_BYTES);
+ if (c == 0) {
+ // First scanner context to open, adjust scan memory limit
+ _adjust_scan_mem_limit(DEFAULT_SCANNER_MEM_BYTES,
+
_scanner_mem_limiter->get_arb_scanner_mem_bytes());
+ }
+ }
+
// when user not specify scan_thread_num, so we can try downgrade
_max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy
too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
@@ -201,13 +277,22 @@ Status ScannerContext::init() {
ScannerContext::~ScannerContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
- _tasks_queue.clear();
+ _completed_tasks.clear();
BlockUPtr block;
while (_free_blocks.try_dequeue(block)) {
// do nothing
}
block.reset();
DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
+
+ // Cleanup memory limiter if last context closing
+ if (_enable_adaptive_scanners) {
+ if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) {
+ // Last scanner context to close, reset scan memory limit
+
_adjust_scan_mem_limit(_scanner_mem_limiter->get_arb_scanner_mem_bytes(), 0);
+ }
+ }
+
if (_task_handle) {
if (auto* task_executor_scheduler =
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
@@ -253,7 +338,7 @@ Status
ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
// and _num_finished_scanners will be reduced.
// if submit succeed, it will be also added back by
ScannerContext::push_back_scan_task
// see ScannerScheduler::_scanner_scan.
- _num_scheduled_scanners++;
+ _in_flight_tasks_num++;
return _scanner_scheduler->submit(shared_from_this(), scan_task);
}
@@ -263,13 +348,10 @@ void ScannerContext::clear_free_blocks() {
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
if (scan_task->status_ok()) {
- for (const auto& [block, _] : scan_task->cached_blocks) {
- if (block->rows() > 0) {
- Status st = validate_block_schema(block.get());
- if (!st.ok()) {
- scan_task->set_status(st);
- break;
- }
+ if (scan_task->cached_block && scan_task->cached_block->rows() > 0) {
+ Status st = validate_block_schema(scan_task->cached_block.get());
+ if (!st.ok()) {
+ scan_task->set_status(st);
}
}
}
@@ -278,8 +360,8 @@ void
ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
if (!scan_task->status_ok()) {
_process_status = scan_task->get_status();
}
- _tasks_queue.push_back(scan_task);
- _num_scheduled_scanners--;
+ _completed_tasks.push_back(scan_task);
+ _in_flight_tasks_num--;
_dependency->set_ready();
}
@@ -298,10 +380,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, Block* block, b
std::shared_ptr<ScanTask> scan_task = nullptr;
- if (!_tasks_queue.empty() && !done()) {
+ if (!_completed_tasks.empty() && !done()) {
// https://en.cppreference.com/w/cpp/container/list/front
// The behavior is undefined if the list is empty.
- scan_task = _tasks_queue.front();
+ scan_task = _completed_tasks.front();
+ _completed_tasks.pop_front();
}
if (scan_task != nullptr) {
@@ -314,53 +397,44 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, Block* block, b
return _process_status;
}
- // No need to worry about small block, block is merged together when
they are appended to cached_blocks.
- if (!scan_task->cached_blocks.empty()) {
- auto [current_block, block_size] =
std::move(scan_task->cached_blocks.front());
- scan_task->cached_blocks.pop_front();
+ if (scan_task->cached_block) {
+ // No need to worry about small block, block is merged together
when they are appended to cached_blocks.
+ auto current_block = std::move(scan_task->cached_block);
+ auto block_size = current_block->allocated_bytes();
+ scan_task->cached_block.reset();
_block_memory_usage -= block_size;
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
}
-
VLOG_DEBUG << fmt::format(
- "ScannerContext {} get block from queue, task_queue size {},
current scan "
+ "ScannerContext {} get block from queue, current scan "
"task remaing cached_block size {}, eos {}, scheduled tasks
{}",
- ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(),
scan_task->is_eos(),
- _num_scheduled_scanners);
-
- if (scan_task->cached_blocks.empty()) {
- // All Cached blocks are consumed, pop this task from task_queue.
- if (!_tasks_queue.empty()) {
- _tasks_queue.pop_front();
- }
-
- if (scan_task->is_eos()) {
- // 1. if eos, record a finished scanner.
- _num_finished_scanners++;
- RETURN_IF_ERROR(
-
_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
- } else {
- RETURN_IF_ERROR(
-
_scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
- }
+ ctx_id, _completed_tasks.size(), scan_task->is_eos(),
_in_flight_tasks_num);
+ if (scan_task->is_eos()) {
+ // 1. if eos, record a finished scanner.
+ _num_finished_scanners++;
+
RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(),
nullptr, l));
+ } else {
+ scan_task->set_state(ScanTask::State::IN_FLIGHT);
+ RETURN_IF_ERROR(
+ _scanner_scheduler->schedule_scan_task(shared_from_this(),
scan_task, l));
}
}
// Mark finished when either:
// (1) all scanners completed normally, or
// (2) shared limit exhausted and no scanners are still running.
- if (_tasks_queue.empty() && (_num_finished_scanners ==
_all_scanners.size() ||
-
(_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
- _num_scheduled_scanners == 0))) {
+ if (_completed_tasks.empty() &&
+ (_num_finished_scanners == _all_scanners.size() ||
+ (_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
_in_flight_tasks_num == 0))) {
_set_scanner_done();
_is_finished = true;
}
*eos = done();
- if (_tasks_queue.empty()) {
+ if (_completed_tasks.empty()) {
_dependency->block();
}
@@ -403,7 +477,7 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
sc->_scanner->try_stop();
}
}
- _tasks_queue.clear();
+ _completed_tasks.clear();
if (_task_handle) {
if (auto* task_executor_scheduler =
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
@@ -460,14 +534,19 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
std::string ScannerContext::debug_string() {
return fmt::format(
- "id: {}, total scanners: {}, pending tasks: {},"
+ "_query_id: {}, id: {}, total scanners: {}, pending tasks: {},
completed tasks: {},"
" _should_stop: {}, _is_finished: {}, free blocks: {},"
- " limit: {}, remaining_limit: {}, _num_running_scanners: {},
_max_thread_num: {},"
- " _max_bytes_in_queue: {}, query_id: {}",
- ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop,
_is_finished,
- _free_blocks.size_approx(), limit,
_shared_scan_limit->load(std::memory_order_relaxed),
- _num_scheduled_scanners, _max_scan_concurrency,
_max_bytes_in_queue,
- print_id(_query_id));
+ " limit: {}, _in_flight_tasks_num: {}, remaining_limit: {},
_num_running_scanners: {}, "
+ "_max_thread_num: {},"
+ " _max_bytes_in_queue: {}, _ins_idx: {},
_enable_adaptive_scanners: {}, "
+ "_mem_share_arb: {}, _scanner_mem_limiter: {}",
+ print_id(_query_id), ctx_id, _all_scanners.size(),
_pending_tasks.size(),
+ _completed_tasks.size(), _should_stop, _is_finished,
_free_blocks.size_approx(), limit,
+ _shared_scan_limit->load(std::memory_order_relaxed),
_in_flight_tasks_num,
+ _num_finished_scanners, _max_scan_concurrency,
_max_bytes_in_queue, _ins_idx,
+ _enable_adaptive_scanners,
+ _enable_adaptive_scanners ? _mem_share_arb->debug_string() :
"NULL",
+ _enable_adaptive_scanners ? _scanner_mem_limiter->debug_string() :
"NULL");
}
void ScannerContext::_set_scanner_done() {
@@ -475,38 +554,64 @@ void ScannerContext::_set_scanner_done() {
}
void ScannerContext::update_peak_running_scanner(int num) {
+#ifndef BE_TEST
_local_state->_peak_running_scanner->add(num);
+#endif
+ if (_enable_adaptive_scanners) {
+ _scanner_mem_limiter->update_running_tasks_count(num);
+ }
+}
+
+void ScannerContext::reestimated_block_mem_bytes(int64_t num) {
+ if (_enable_adaptive_scanners) {
+ _scanner_mem_limiter->reestimated_block_mem_bytes(num);
+ }
}
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>&
transfer_lock,
std::unique_lock<std::shared_mutex>&
scheduler_lock) {
+ // Get effective max concurrency considering adaptive scheduling
+ int32_t effective_max_concurrency = _available_pickup_scanner_count();
+ DCHECK_LE(effective_max_concurrency, _max_scan_concurrency);
+
// margin_1 is used to ensure each scan operator could have at least
_min_scan_concurrency scan tasks.
int32_t margin_1 = _min_scan_concurrency -
- (cast_set<int32_t>(_tasks_queue.size()) +
_num_scheduled_scanners);
+ (cast_set<int32_t>(_completed_tasks.size()) +
_in_flight_tasks_num);
// margin_2 is used to ensure the scan scheduler could have at least
_min_scan_concurrency_of_scan_scheduler scan tasks.
int32_t margin_2 =
_min_scan_concurrency_of_scan_scheduler -
(_scanner_scheduler->get_active_threads() +
_scanner_scheduler->get_queue_size());
+ // margin_3 is used to respect adaptive max concurrency limit
+ int32_t margin_3 =
+ std::max(effective_max_concurrency -
+ (cast_set<int32_t>(_completed_tasks.size()) +
_in_flight_tasks_num),
+ 1);
+
if (margin_1 <= 0 && margin_2 <= 0) {
return 0;
}
int32_t margin = std::max(margin_1, margin_2);
+ if (_enable_adaptive_scanners) {
+ margin = std::min(margin, margin_3); // Cap by adaptive limit
+ }
if (low_memory_mode()) {
// In low memory mode, we will limit the number of running scanners to
`low_memory_mode_scanners()`.
// So that we will not submit too many scan tasks to scheduler.
- margin = std::min(low_memory_mode_scanners() -
_num_scheduled_scanners, margin);
+ margin = std::min(low_memory_mode_scanners() - _in_flight_tasks_num,
margin);
}
VLOG_DEBUG << fmt::format(
"[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}),
margin_2: {} = {} - "
- "({} + {}), margin: {}",
- print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency,
_tasks_queue.size(),
- _num_scheduled_scanners, margin_2,
_min_scan_concurrency_of_scan_scheduler,
- _scanner_scheduler->get_active_threads(),
_scanner_scheduler->get_queue_size(), margin);
+ "({} + {}), margin_3: {} = {} - ({} + {}), margin: {}, adaptive:
{}",
+ print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency,
_completed_tasks.size(),
+ _in_flight_tasks_num, margin_2,
_min_scan_concurrency_of_scan_scheduler,
+ _scanner_scheduler->get_active_threads(),
_scanner_scheduler->get_queue_size(),
+ margin_3, effective_max_concurrency, _completed_tasks.size(),
_in_flight_tasks_num,
+ margin, _enable_adaptive_scanners);
return margin;
}
@@ -518,7 +623,7 @@ Status
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
std::unique_lock<std::mutex>&
transfer_lock,
std::unique_lock<std::shared_mutex>&
scheduler_lock) {
if (current_scan_task &&
- (!current_scan_task->cached_blocks.empty() ||
current_scan_task->is_eos())) {
+ (current_scan_task->cached_block != nullptr ||
current_scan_task->is_eos())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler
logical error.");
}
@@ -532,18 +637,19 @@ Status
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
// We need to add it back to task queue to make sure it could be
resubmitted.
if (current_scan_task) {
// This usually happens when we should downgrade the concurrency.
- _pending_scanners.push(current_scan_task);
+ current_scan_task->set_state(ScanTask::State::PENDING);
+ _pending_tasks.push(current_scan_task);
VLOG_DEBUG << fmt::format(
- "{} push back scanner to task queue, because diff <= 0,
task_queue size "
- "{}, _num_scheduled_scanners {}",
- ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
+ "{} push back scanner to task queue, because diff <= 0,
_completed_tasks size "
+ "{}, _in_flight_tasks_num {}",
+ ctx_id, _completed_tasks.size(), _in_flight_tasks_num);
}
#ifndef NDEBUG
// This DCHECK is necessary.
// We need to make sure each scan operator could have at least 1 scan
tasks.
// Or this scan operator will not be re-scheduled.
- if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 &&
_tasks_queue.empty()) {
+ if (!_pending_tasks.empty() && _in_flight_tasks_num == 0 &&
_completed_tasks.empty()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner
scheduler logical error.");
}
#endif
@@ -556,10 +662,10 @@ Status
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
while (margin-- > 0) {
std::shared_ptr<ScanTask> task_to_run;
const int32_t current_concurrency = cast_set<int32_t>(
- _tasks_queue.size() + _num_scheduled_scanners +
tasks_to_submit.size());
+ _completed_tasks.size() + _in_flight_tasks_num +
tasks_to_submit.size());
VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} +
{}", ctx_id,
- current_concurrency, _tasks_queue.size(),
_num_scheduled_scanners,
- tasks_to_submit.size());
+ current_concurrency, _completed_tasks.size(),
+ _in_flight_tasks_num,
tasks_to_submit.size());
if (first_pull) {
task_to_run = _pull_next_scan_task(current_scan_task,
current_concurrency);
if (task_to_run == nullptr) {
@@ -567,16 +673,16 @@ Status
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
// 1. current_concurrency already reached
_max_scan_concurrency.
// 2. all scanners are finished.
if (current_scan_task) {
- DCHECK(current_scan_task->cached_blocks.empty());
+ DCHECK(current_scan_task->cached_block == nullptr);
DCHECK(!current_scan_task->is_eos());
- if (!current_scan_task->cached_blocks.empty() ||
current_scan_task->is_eos()) {
+ if (current_scan_task->cached_block != nullptr ||
current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Scanner scheduler logical
error.");
}
- // Current scan task is not eos, but we can not resubmit
it.
- // Add current_scan_task back to task queue, so that we
have chance to resubmit it in the future.
- _pending_scanners.push(current_scan_task);
+ // Current scan task is not scheduled, we need to add it
back to task queue to make sure it could be resubmitted.
+ current_scan_task->set_state(ScanTask::State::PENDING);
+ _pending_tasks.push(current_scan_task);
}
}
first_pull = false;
@@ -597,7 +703,7 @@ Status
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler,
remaining scanner: {}",
print_id(_query_id), ctx_id,
tasks_to_submit.size(),
- _pending_scanners.size());
+ _pending_tasks.size());
for (auto& scan_task_iter : tasks_to_submit) {
Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
@@ -613,31 +719,38 @@ Status
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
std::shared_ptr<ScanTask> current_scan_task, int32_t
current_concurrency) {
- if (current_concurrency >= _max_scan_concurrency) {
+ int32_t effective_max_concurrency = _max_scan_concurrency;
+ if (_enable_adaptive_scanners) {
+ effective_max_concurrency = _adaptive_processor->expected_scanners > 0
+ ?
_adaptive_processor->expected_scanners
+ : _max_scan_concurrency;
+ }
+
+ if (current_concurrency >= effective_max_concurrency) {
VLOG_DEBUG << fmt::format(
- "ScannerContext {} current concurrency {} >=
_max_scan_concurrency {}, skip "
+ "ScannerContext {} current concurrency {} >=
effective_max_concurrency {}, skip "
"pull",
- ctx_id, current_concurrency, _max_scan_concurrency);
+ ctx_id, current_concurrency, effective_max_concurrency);
return nullptr;
}
if (current_scan_task != nullptr) {
- if (!current_scan_task->cached_blocks.empty() ||
current_scan_task->is_eos()) {
+ if (current_scan_task->cached_block != nullptr ||
current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner
scheduler logical error.");
}
return current_scan_task;
}
- if (!_pending_scanners.empty()) {
+ if (!_pending_tasks.empty()) {
// If shared limit quota is exhausted, do not submit new scanners from
pending queue.
int64_t remaining =
_shared_scan_limit->load(std::memory_order_acquire);
if (remaining == 0) {
return nullptr;
}
std::shared_ptr<ScanTask> next_scan_task;
- next_scan_task = _pending_scanners.top();
- _pending_scanners.pop();
+ next_scan_task = _pending_tasks.top();
+ _pending_tasks.pop();
return next_scan_task;
} else {
return nullptr;
diff --git a/be/src/exec/scan/scanner_context.h
b/be/src/exec/scan/scanner_context.h
index 1aea5da03c9..d3b61c5f8c0 100644
--- a/be/src/exec/scan/scanner_context.h
+++ b/be/src/exec/scan/scanner_context.h
@@ -36,6 +36,7 @@
#include "common/status.h"
#include "concurrentqueue.h"
#include "core/block/block.h"
+#include "exec/common/memory.h"
#include "exec/scan/scanner.h"
#include "exec/scan/task_executor/split_runner.h"
#include "runtime/runtime_profile.h"
@@ -52,12 +53,49 @@ class Dependency;
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
-class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
+// Adaptive processor for dynamic scanner concurrency adjustment
+struct ScannerAdaptiveProcessor {
+ ENABLE_FACTORY_CREATOR(ScannerAdaptiveProcessor)
+ ScannerAdaptiveProcessor() = default;
+ ~ScannerAdaptiveProcessor() = default;
+ // Expected scanners in this cycle
+
+ int expected_scanners = 0;
+ // Timing metrics
+ // int64_t context_start_time = 0;
+ // int64_t scanner_total_halt_time = 0;
+ // int64_t scanner_gen_blocks_time = 0;
+ // std::atomic_int64_t scanner_total_io_time = 0;
+ // std::atomic_int64_t scanner_total_running_time = 0;
+ // std::atomic_int64_t scanner_total_scan_bytes = 0;
+
+ // Timestamps
+ // std::atomic_int64_t last_scanner_finish_timestamp = 0;
+ // int64_t check_all_scanners_last_timestamp = 0;
+ // int64_t last_driver_output_full_timestamp = 0;
+ int64_t adjust_scanners_last_timestamp = 0;
+
+ // Adjustment strategy fields
+ // bool try_add_scanners = false;
+ // double expected_speedup_ratio = 0;
+ // double last_scanner_scan_speed = 0;
+ // int64_t last_scanner_total_scan_bytes = 0;
+ // int try_add_scanners_fail_count = 0;
+ // int check_slow_io = 0;
+ // int32_t slow_io_latency_ms = 100; // Default from config
+};
+
class ScanTask {
public:
+ enum class State : int {
+ PENDING, // not scheduled yet
+ IN_FLIGHT, // scheduled and running
+ COMPLETED, // finished with result or error, waiting to be collected
by scan node
+ EOS, // finished and no more data, waiting to be collected by
scan node
+ };
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) :
scanner(delegate_scanner) {
_resource_ctx = thread_context()->resource_ctx();
DorisMetrics::instance()->scanner_task_cnt->increment(1);
@@ -65,19 +103,19 @@ public:
~ScanTask() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
- cached_blocks.clear();
DorisMetrics::instance()->scanner_task_cnt->increment(-1);
+ cached_block.reset();
}
private:
// whether current scanner is finished
- bool eos = false;
Status status = Status::OK();
std::shared_ptr<ResourceContext> _resource_ctx;
+ State _state = State::PENDING;
public:
std::weak_ptr<ScannerDelegate> scanner;
- std::list<std::pair<BlockUPtr, size_t>> cached_blocks;
+ BlockUPtr cached_block = nullptr;
bool is_first_schedule = true;
// Use weak_ptr to avoid circular references and potential memory leaks
with SplitRunner.
// ScannerContext only needs to observe the lifetime of SplitRunner
without owning it.
@@ -87,14 +125,39 @@ public:
void set_status(Status _status) {
if (_status.is<ErrorCode::END_OF_FILE>()) {
// set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
- eos = true;
+ _state = State::EOS;
}
status = _status;
}
Status get_status() const { return status; }
bool status_ok() { return status.ok() ||
status.is<ErrorCode::END_OF_FILE>(); }
- bool is_eos() const { return eos; }
- void set_eos(bool _eos) { eos = _eos; }
+ bool is_eos() const { return _state == State::EOS; }
+ void set_state(State state) {
+ switch (state) {
+ case State::PENDING:
+ DCHECK(_state == State::PENDING || _state == State::IN_FLIGHT) <<
(int)_state;
+ DCHECK(cached_block == nullptr);
+ break;
+ case State::IN_FLIGHT:
+ DCHECK(_state == State::COMPLETED || _state == State::PENDING ||
+ _state == State::IN_FLIGHT)
+ << (int)_state;
+ DCHECK(cached_block == nullptr);
+ break;
+ case State::COMPLETED:
+ DCHECK(_state == State::IN_FLIGHT) << (int)_state;
+ DCHECK(cached_block != nullptr);
+ break;
+ case State::EOS:
+ DCHECK(_state == State::IN_FLIGHT ||
status.is<ErrorCode::END_OF_FILE>())
+ << (int)_state;
+ break;
+ default:
+ break;
+ }
+
+ _state = state;
+ }
};
// ScannerContext is responsible for recording the execution status
@@ -115,7 +178,9 @@ public:
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>&
scanners, int64_t limit_,
- std::shared_ptr<Dependency> dependency,
std::atomic<int64_t>* shared_scan_limit
+ std::shared_ptr<Dependency> dependency,
std::atomic<int64_t>* shared_scan_limit,
+ std::shared_ptr<MemShareArbitrator> arb,
std::shared_ptr<MemLimiter> limiter,
+ int ins_idx, bool enable_adaptive_scan
#ifdef BE_TEST
,
int num_parallel_instances
@@ -125,6 +190,7 @@ public:
~ScannerContext() override;
Status init();
+ // TODO(gabriel): we can also consider to return a list of blocks to
reduce the scheduling overhead, but it may cause larger memory usage and more
complex logic of block management.
BlockUPtr get_free_block(bool force);
void return_free_block(BlockUPtr block);
void clear_free_blocks();
@@ -134,6 +200,7 @@ public:
// Caller should make sure the pipeline task is still running when calling
this function
void update_peak_running_scanner(int num);
+ void reestimated_block_mem_bytes(int64_t num);
// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
@@ -186,7 +253,7 @@ public:
int32_t num_scheduled_scanners() {
std::lock_guard<std::mutex> l(_transfer_lock);
- return _num_scheduled_scanners;
+ return _in_flight_tasks_num;
}
Status schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
@@ -208,9 +275,6 @@ protected:
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
- std::mutex _transfer_lock;
- std::list<std::shared_ptr<ScanTask>> _tasks_queue;
-
Status _process_status = Status::OK();
std::atomic_bool _should_stop = false;
std::atomic_bool _is_finished = false;
@@ -229,10 +293,40 @@ protected:
int64_t remaining_limit() const { return
_shared_scan_limit->load(std::memory_order_acquire); }
int64_t _max_bytes_in_queue = 0;
- // Using stack so that we can resubmit scanner in a LIFO order, maybe more
cache friendly
- std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
- // Scanner that is submitted to the scheduler.
- std::atomic_int _num_scheduled_scanners = 0;
+ // _transfer_lock protects _completed_tasks, _pending_tasks, and all other
shared state
+ // accessed by both the scanner thread pool and the operator
(get_block_from_queue).
+ std::mutex _transfer_lock;
+
+ // Together, _completed_tasks and _in_flight_tasks_num represent all
"occupied" concurrency
+ // slots. The scheduler uses their sum as the current concurrency:
+ //
+ // current_concurrency = _completed_tasks.size() + _in_flight_tasks_num
+ //
+ // Lifecycle of a ScanTask:
+ // _pending_tasks --(submit_scan_task)--> [thread pool]
--(push_back_scan_task)-->
+ // _completed_tasks --(get_block_from_queue)--> operator
+ // After consumption: non-EOS task goes back to _pending_tasks; EOS
increments
+ // _num_finished_scanners.
+
+ // Completed scan tasks whose cached_block is ready for the operator to
consume.
+ // Protected by _transfer_lock. Written by push_back_scan_task() (scanner
thread),
+ // read/popped by get_block_from_queue() (operator thread).
+ std::list<std::shared_ptr<ScanTask>> _completed_tasks;
+
+ // Scanners waiting to be submitted to the scheduler thread pool. Stored
as a stack
+ // (LIFO) so that recently-used scanners are re-scheduled first, which is
more likely
+ // to be cache-friendly. Protected by _transfer_lock. Populated in the
constructor
+ // and by schedule_scan_task() when the concurrency limit is reached;
drained by
+ // _pull_next_scan_task() during scheduling.
+ std::stack<std::shared_ptr<ScanTask>> _pending_tasks;
+
+ // Number of scan tasks currently submitted to the scanner scheduler
thread pool
+ // (i.e. in-flight). Incremented by submit_scan_task() before submission
and
+ // decremented by push_back_scan_task() when the thread pool returns the
task.
+ // Declared atomic so it can be read without _transfer_lock in
non-critical paths,
+ // but must be read under _transfer_lock whenever combined with
_completed_tasks.size()
+ // to form a consistent concurrency snapshot.
+ std::atomic_int _in_flight_tasks_num = 0;
// Scanner that is eos or error.
int32_t _num_finished_scanners = 0;
// weak pointer for _scanners, used in stop function
@@ -265,6 +359,19 @@ protected:
int32_t _get_margin(std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock);
+ // Memory-aware adaptive scheduling
+ std::shared_ptr<MemLimiter> _scanner_mem_limiter = nullptr;
+ std::shared_ptr<MemShareArbitrator> _mem_share_arb = nullptr;
+ std::shared_ptr<ScannerAdaptiveProcessor> _adaptive_processor = nullptr;
+ const int _ins_idx;
+ const bool _enable_adaptive_scanners = false;
+
+ // Adjust scan memory limit based on arbitrator feedback
+ void _adjust_scan_mem_limit(int64_t old_scanner_mem_bytes, int64_t
new_scanner_mem_bytes);
+
+ // Calculate available scanner count for adaptive scheduling
+ int _available_pickup_scanner_count();
+
// TODO: Add implementation of runtime_info_feed_back
// adaptive scan concurrency related end
};
diff --git a/be/src/exec/scan/scanner_scheduler.cpp
b/be/src/exec/scan/scanner_scheduler.cpp
index 9961407bdbb..0a29b54696f 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -67,7 +67,8 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return Status::OK();
}
- scanner_delegate->_scanner->start_wait_worker_timer();
+ scan_task->set_state(ScanTask::State::IN_FLIGHT);
+ scanner_delegate->_scanner->pause();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
auto work_func = [scanner_ref = scan_task, ctx]() {
@@ -161,13 +162,12 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
MonotonicStopWatch max_run_time_watch;
max_run_time_watch.start();
- scanner->update_wait_worker_timer();
- scanner->start_scan_cpu_timer();
+ scanner->resume();
bool need_update_profile = true;
auto update_scanner_profile = [&]() {
if (need_update_profile) {
- scanner->update_scan_cpu_timer();
+ scanner->pause();
scanner->update_realtime_counters();
need_update_profile = false;
}
@@ -175,12 +175,6 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
Status status = Status::OK();
bool eos = false;
- Defer defer_scanner([&] {
- if (status.ok() && !eos) {
- // if status is not ok, it means the scanner is failed, and the
counter may be not updated correctly, so no need to update counter again. if
eos is true, it means the scanner is finished successfully, and the counter is
updated correctly, so no need to update counter again.
- scanner->start_wait_worker_timer();
- }
- });
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
@@ -217,135 +211,90 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
}
- size_t raw_bytes_read = 0;
- bool first_read = true; int64_t limit = scanner->limit();
- // If the first block is full, then it is true. Or the first block
+ second block > batch_size
- bool has_first_full_block = false;
-
- // During low memory mode, every scan task will return at most 2
block to reduce memory usage.
- while (!eos && raw_bytes_read < raw_bytes_threshold &&
- (!ctx->low_memory_mode() || !has_first_full_block) &&
- (!has_first_full_block || doris::thread_context()
-
->thread_mem_tracker_mgr->limiter_mem_tracker()
- ->check_limit(1))) {
- if (UNLIKELY(ctx->done())) {
- eos = true;
- break;
- }
- // If shared limit quota is exhausted, stop scanning.
- if (ctx->remaining_limit() == 0) {
- eos = true;
- break;
- }
- if (max_run_time_watch.elapsed_time() >
- config::doris_scanner_max_run_time_ms * 1e6) {
- break;
- }
- DEFER_RELEASE_RESERVED();
- BlockUPtr free_block;
- if (first_read) {
- free_block = ctx->get_free_block(first_read);
- } else {
- if (state->get_query_ctx()
- ->resource_ctx()
- ->task_controller()
- ->is_enable_reserve_memory()) {
- size_t block_avg_bytes =
scanner->get_block_avg_bytes();
- auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(
- block_avg_bytes);
- if (!st.ok()) {
- handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
- break;
+ bool first_read = true;
+ int64_t limit = scanner->limit(); if (UNLIKELY(ctx->done())) {
+ eos = true;
+ } else if (ctx->remaining_limit() == 0) { eos = true; } else if
(!eos) {
+ do {
+ DEFER_RELEASE_RESERVED();
+ BlockUPtr free_block;
+ if (first_read) {
+ free_block = ctx->get_free_block(first_read);
+ } else {
+ if (state->get_query_ctx()
+ ->resource_ctx()
+ ->task_controller()
+ ->is_enable_reserve_memory()) {
+ size_t block_avg_bytes =
scanner->get_block_avg_bytes();
+ auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(
+ block_avg_bytes);
+ if (!st.ok()) {
+ handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
+ break;
+ }
}
+ free_block = ctx->get_free_block(first_read);
}
- free_block = ctx->get_free_block(first_read);
- }
- if (free_block == nullptr) {
- break;
- }
- // We got a new created block or a reused block.
- status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
- first_read = false;
- if (!status.ok()) {
- LOG(WARNING) << "Scan thread read Scanner failed: " <<
status.to_string();
- break;
- }
- // Check column type only after block is read successfully.
- // Or it may cause a crash when the block is not normal.
- _make_sure_virtual_col_is_materialized(scanner,
free_block.get());
-
- // Shared limit quota: acquire rows from the context's shared
pool.
- // Discard or truncate the block if quota is exhausted.
- if (free_block->rows() > 0) {
- int64_t block_rows = free_block->rows();
- int64_t granted = ctx->acquire_limit_quota(block_rows);
- if (granted == 0) {
- // No quota remaining, discard this block and mark eos.
- ctx->return_free_block(std::move(free_block));
- eos = true;
+ if (free_block == nullptr) {
break;
- } else if (granted < block_rows) {
- // Partial quota: truncate block to granted rows and
mark eos.
- free_block->set_num_rows(granted);
- eos = true;
}
- }
- // Projection will truncate useless columns, makes block size
change.
- auto free_block_bytes = free_block->allocated_bytes();
- raw_bytes_read += free_block_bytes;
- if (!scan_task->cached_blocks.empty() &&
- scan_task->cached_blocks.back().first->rows() +
free_block->rows() <=
- ctx->batch_size()) {
- size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
- MutableBlock
mutable_block(scan_task->cached_blocks.back().first.get());
- status = mutable_block.merge(*free_block);
+ // We got a new created block or a reused block.
+ status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
+ first_read = false;
if (!status.ok()) {
- LOG(WARNING) << "Block merge failed: " <<
status.to_string();
+ LOG(WARNING) << "Scan thread read Scanner failed: " <<
status.to_string();
break;
}
- scan_task->cached_blocks.back().second =
mutable_block.allocated_bytes();
- scan_task->cached_blocks.back().first.get()->set_columns(
- std::move(mutable_block.mutable_columns()));
-
- // Return block succeed or not, this free_block is not
used by this scan task any more.
- // If block can be reused, its memory usage will be added
back.
- ctx->return_free_block(std::move(free_block));
-
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
- block_size);
- } else {
- if (!scan_task->cached_blocks.empty()) {
- has_first_full_block = true;
+ // Check column type only after block is read successfully.
+ // Or it may cause a crash when the block is not normal.
+ _make_sure_virtual_col_is_materialized(scanner,
free_block.get());
+
+ // Shared limit quota: acquire rows from the context's
shared pool.
+ // Discard or truncate the block if quota is exhausted.
+ if (free_block->rows() > 0) {
+ int64_t block_rows = free_block->rows();
+ int64_t granted = ctx->acquire_limit_quota(block_rows);
+ if (granted == 0) {
+ // No quota remaining, discard this block and mark
eos.
+ ctx->return_free_block(std::move(free_block));
+ eos = true;
+ break;
+ } else if (granted < block_rows) {
+ // Partial quota: truncate block to granted rows
and mark eos.
+ free_block->set_num_rows(granted);
+ eos = true;
+ }
}
+ // Projection will truncate useless columns, makes block
size change.
+ auto free_block_bytes = free_block->allocated_bytes();
+
ctx->reestimated_block_mem_bytes(cast_set<int64_t>(free_block_bytes));
+ DCHECK(scan_task->cached_block == nullptr);
ctx->inc_block_usage(free_block->allocated_bytes());
-
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
- }
+ scan_task->cached_block = std::move(free_block);
- // Per-scanner small-limit optimization: if limit is small (<
batch_size),
- // return immediately instead of accumulating to
raw_bytes_threshold.
- if (limit > 0 && limit < ctx->batch_size()) {
- break;
- }
+ // Per-scanner small-limit optimization: if limit is small
(< batch_size),
+ // return immediately instead of accumulating to
raw_bytes_threshold.
+ if (limit > 0 && limit < ctx->batch_size()) {
+ break;
+ }
- if (scan_task->cached_blocks.back().first->rows() > 0) {
- auto block_avg_bytes =
(scan_task->cached_blocks.back().first->bytes() +
-
scan_task->cached_blocks.back().first->rows() - 1) /
-
scan_task->cached_blocks.back().first->rows() *
- ctx->batch_size();
- scanner->update_block_avg_bytes(block_avg_bytes);
- }
- if (ctx->low_memory_mode()) {
- ctx->clear_free_blocks();
- if (raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
- raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ if (scan_task->cached_block->rows() > 0) {
+ auto block_avg_bytes =
(scan_task->cached_block->bytes() +
+
scan_task->cached_block->rows() - 1) /
+ scan_task->cached_block->rows()
* ctx->batch_size();
+ scanner->update_block_avg_bytes(block_avg_bytes);
}
- }
- } // end for while
+ if (ctx->low_memory_mode()) {
+ ctx->clear_free_blocks();
+ }
+ } while (false);
+ }
- if (UNLIKELY(!status.ok())) {
- scan_task->set_status(status);
- eos = true;
- },
- status);
+ if (UNLIKELY(!status.ok())) {
+
scan_task->set_status(status);
+ eos = true;
+ },
+ status);
if (UNLIKELY(!status.ok())) {
scan_task->set_status(status);
@@ -357,14 +306,15 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// so we need update_scanner_profile here
update_scanner_profile();
scanner->mark_to_need_to_close();
+ scan_task->set_state(ScanTask::State::EOS);
+ } else {
+ scan_task->set_state(ScanTask::State::COMPLETED);
}
- scan_task->set_eos(eos);
VLOG_DEBUG << fmt::format(
- "Scanner context {} has finished task, cached_block {} current
scheduled task is "
+ "Scanner context {} has finished task, current scheduled task is "
"{}, eos: {}, status: {}",
- ctx->ctx_id, scan_task->cached_blocks.size(),
ctx->num_scheduled_scanners(), eos,
- status.to_string());
+ ctx->ctx_id, ctx->num_scheduled_scanners(), eos,
status.to_string());
ctx->push_back_scan_task(scan_task);
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 24831a1f485..dea1d6ed776 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -138,6 +138,9 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
}
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
DorisMetrics::instance()->query_ctx_cnt->increment(1);
+ _mem_arb = MemShareArbitrator::create_shared(
+ query_id, query_options.mem_limit,
+ query_options.__isset.max_scan_mem_ratio ?
query_options.max_scan_mem_ratio : 1.0);
}
void QueryContext::_init_query_mem_tracker() {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 2cf8ec76e98..456970ca1a6 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,6 +33,7 @@
#include "common/factory_creator.h"
#include "common/object_pool.h"
#include "common/status.h"
+#include "exec/common/memory.h"
#include "exec/runtime_filter/runtime_filter_mgr.h"
#include "exec/scan/scanner_scheduler.h"
#include "runtime/exec_env.h"
@@ -220,6 +221,7 @@ public:
}
bool is_nereids() const { return _is_nereids; }
+ std::shared_ptr<MemShareArbitrator> mem_arb() const { return _mem_arb; }
WorkloadGroupPtr workload_group() const { return
_resource_ctx->workload_group(); }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
@@ -394,6 +396,7 @@ private:
// instance id + node id -> cte scan
std::map<std::pair<TUniqueId, int>, RecCTEScanLocalState*> _cte_scan;
std::mutex _cte_scan_lock;
+ std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
public:
// when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
diff --git a/be/test/exec/scan/scanner_context_test.cpp
b/be/test/exec/scan/scanner_context_test.cpp
index 18e815f7fe8..14a48e37a9a 100644
--- a/be/test/exec/scan/scanner_context_test.cpp
+++ b/be/test/exec/scan/scanner_context_test.cpp
@@ -149,7 +149,8 @@ TEST_F(ScannerContextTest, test_init) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
scan_operator->_should_run_serial = false;
@@ -209,7 +210,8 @@ TEST_F(ScannerContextTest, test_serial_run) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
scan_operator->_should_run_serial = true;
@@ -267,7 +269,8 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
scan_operator->_should_run_serial = false;
@@ -317,14 +320,15 @@ TEST_F(ScannerContextTest, test_push_back_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
- scanner_context->_num_scheduled_scanners = 11;
+ scanner_context->_in_flight_tasks_num = 11;
for (int i = 0; i < 5; ++i) {
auto scan_task =
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scanner_context->push_back_scan_task(scan_task);
- ASSERT_EQ(scanner_context->_num_scheduled_scanners, 10 - i);
+ ASSERT_EQ(scanner_context->_in_flight_tasks_num, 10 - i);
}
}
@@ -354,7 +358,8 @@ TEST_F(ScannerContextTest, get_margin) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -408,7 +413,7 @@ TEST_F(ScannerContextTest, get_margin) {
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
- scanner_context->_num_scheduled_scanners = 0;
+ scanner_context->_in_flight_tasks_num = 0;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, scanner_context->_min_scan_concurrency);
@@ -419,7 +424,7 @@ TEST_F(ScannerContextTest, get_margin) {
EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
- scanner_context->_num_scheduled_scanners = 20;
+ scanner_context->_in_flight_tasks_num = 20;
margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
ASSERT_EQ(margin, 0);
}
@@ -450,7 +455,8 @@ TEST_F(ScannerContextTest, pull_next_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -474,26 +480,27 @@ TEST_F(ScannerContextTest, pull_next_scan_task) {
scanner_context->_max_scan_concurrency = 2;
BlockUPtr cached_block = Block::create_unique();
- scan_task->cached_blocks.emplace_back(std::move(cached_block), 0);
+ scan_task->cached_block = std::move(cached_block);
EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1));
- scan_task->cached_blocks.clear();
- scan_task->eos = true;
+ scan_task->cached_block.reset();
+ scan_task->_state = ScanTask::State::IN_FLIGHT;
+ scan_task->set_state(ScanTask::State::EOS);
EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1));
- scan_task->cached_blocks.clear();
- scan_task->eos = false;
+ scan_task->cached_block.reset();
+ scan_task->_state = ScanTask::State::IN_FLIGHT;
pull_scan_task = scanner_context->_pull_next_scan_task(
scan_task, scanner_context->_max_scan_concurrency - 1);
EXPECT_EQ(pull_scan_task.get(), scan_task.get());
- scanner_context->_pending_scanners =
std::stack<std::shared_ptr<ScanTask>>();
+ scanner_context->_pending_tasks = std::stack<std::shared_ptr<ScanTask>>();
pull_scan_task = scanner_context->_pull_next_scan_task(
nullptr, scanner_context->_max_scan_concurrency - 1);
EXPECT_EQ(pull_scan_task, nullptr);
- scanner_context->_pending_scanners.push(
+ scanner_context->_pending_tasks.push(
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner)));
pull_scan_task = scanner_context->_pull_next_scan_task(
nullptr, scanner_context->_max_scan_concurrency - 1);
@@ -526,7 +533,8 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -547,18 +555,19 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
Status st = scanner_context->schedule_scan_task(nullptr, transfer_lock,
scheduler_lock);
ASSERT_TRUE(st.ok());
- ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1);
+ ASSERT_EQ(scanner_context->_in_flight_tasks_num, 1);
scanner_context->_max_scan_concurrency = 10;
scanner_context->_max_scan_concurrency = 1;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock,
scheduler_lock);
ASSERT_TRUE(st.ok());
- ASSERT_EQ(scanner_context->_num_scheduled_scanners,
scanner_context->_max_scan_concurrency);
+ ASSERT_EQ(scanner_context->_in_flight_tasks_num,
scanner_context->_max_scan_concurrency);
- scanner_context = ScannerContext::create_shared(
- state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanner_context = ScannerContext::create_shared(state.get(),
olap_scan_local_state.get(),
+ output_tuple_desc,
output_row_descriptor,
+ scanners, limit,
scan_dependency, &shared_limit,
+ nullptr, nullptr, 0,
false, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -570,16 +579,17 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
st = scanner_context->schedule_scan_task(nullptr, transfer_lock,
scheduler_lock);
ASSERT_TRUE(st.ok());
// 15 since we have 15 scanners.
- ASSERT_EQ(scanner_context->_num_scheduled_scanners, 15);
+ ASSERT_EQ(scanner_context->_in_flight_tasks_num, 15);
scanners = std::list<std::shared_ptr<ScannerDelegate>>();
for (int i = 0; i < 1; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}
- scanner_context = ScannerContext::create_shared(
- state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanner_context = ScannerContext::create_shared(state.get(),
olap_scan_local_state.get(),
+ output_tuple_desc,
output_row_descriptor,
+ scanners, limit,
scan_dependency, &shared_limit,
+ nullptr, nullptr, 0,
false, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -590,12 +600,13 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
auto scan_task =
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
st = scanner_context->schedule_scan_task(scan_task, transfer_lock,
scheduler_lock);
// current scan task is added back.
- ASSERT_EQ(scanner_context->_pending_scanners.size(), 1);
- ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1);
+ ASSERT_EQ(scanner_context->_pending_tasks.size(), 1);
+ ASSERT_EQ(scanner_context->_in_flight_tasks_num, 1);
- scanner_context = ScannerContext::create_shared(
- state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanner_context = ScannerContext::create_shared(state.get(),
olap_scan_local_state.get(),
+ output_tuple_desc,
output_row_descriptor,
+ scanners, limit,
scan_dependency, &shared_limit,
+ nullptr, nullptr, 0,
false, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -604,7 +615,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
st = scanner_context->schedule_scan_task(nullptr, transfer_lock,
scheduler_lock);
scan_task =
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
- scan_task->cached_blocks.emplace_back(Block::create_unique(), 0);
+ scan_task->cached_block = Block::create_unique();
// Illigeal situation.
// If current scan task has cached block, it should not be called with
this methods.
EXPECT_ANY_THROW(std::ignore =
scanner_context->schedule_scan_task(scan_task, transfer_lock,
@@ -648,7 +659,8 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
@@ -688,7 +700,8 @@ TEST_F(ScannerContextTest, get_free_block) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_newly_create_free_blocks_num->set(0L);
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
@@ -741,7 +754,8 @@ TEST_F(ScannerContextTest, return_free_block) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
@@ -785,7 +799,8 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, nullptr, nullptr,
0, false,
+ parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
@@ -819,8 +834,9 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
scanner_context->_is_finished = false;
scanner_context->_should_stop = false;
auto scan_task =
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
- scan_task->set_eos(true);
- scanner_context->_tasks_queue.push_back(scan_task);
+ scan_task->_state = ScanTask::State::IN_FLIGHT;
+ scan_task->set_state(ScanTask::State::EOS);
+ scanner_context->_completed_tasks.push_back(scan_task);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_,
testing::_))
@@ -834,4 +850,440 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
EXPECT_EQ(scanner_context->_num_finished_scanners, 1);
}
+/**
+ MemShareArbitrator Tests (5 tests)
+ - scanner_mem_share_arbitrator_basic: Tests initialization, query_id, memory
limits, and initial state
+ - scanner_mem_share_arbitrator_register_scan_node: Tests registering scan
nodes and default memory allocation (64MB)
+ - scanner_mem_share_arbitrator_update_mem_bytes: Tests updating memory bytes
and handling zero values
+ - scanner_mem_share_arbitrator_proportional_sharing: Tests proportional
memory distribution across multiple contexts
+ - scanner_mem_share_arbitrator_zero_ratio: Tests edge case with zero scan
ratio
+
+ MemLimiter Tests (9 tests)
+
+ - scanner_mem_limiter_basic: Tests initialization and default values
+ - scanner_mem_limiter_reestimated_block_mem_bytes: Tests averaging algorithm
for block memory estimation
+ - scanner_mem_limiter_reestimated_zero_value: Tests that zero values are
properly ignored
+ - scanner_mem_limiter_available_scanner_count: Tests scanner count
calculation based on memory limits
+ - scanner_mem_limiter_serial_scan: Tests serial scan mode behavior
+ - scanner_mem_limiter_update_running_tasks_count: Tests atomic counter
updates
+ - scanner_mem_limiter_update_open_tasks_count: Tests context count tracking
+ - scanner_mem_limiter_update_arb_mem_bytes: Tests memory capping at query
limit
+ - scanner_mem_limiter_available_count_distribution: Tests fair distribution
across parallel instances
+
+ ScannerContext with Memory Control Tests (4 tests)
+ - scanner_context_with_adaptive_memory: Tests integration with arbitrator
and limiter
+ - scanner_context_adjust_scan_mem_limit: Tests dynamic memory limit
adjustment
+ - scanner_context_reestimated_block_mem_bytes: Tests block memory
re-estimation propagation
+ - scanner_context_update_peak_running_scanner: Tests peak scanner tracking
with memory control
+
+ Total: 18 new test cases
+
+ All tests follow the existing patterns in the codebase and cover:
+ - Normal operation scenarios
+ - Edge cases (zero values, limits, etc.)
+ - Integration between components
+ - Atomic operations and thread safety
+ - Memory distribution algorithms
+*/
+// ==================== MemShareArbitrator Tests ====================
+TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_basic) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ double max_scan_ratio = 0.3;
+
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, max_scan_ratio);
+
+ ASSERT_EQ(arbitrator->query_id.hi, 1);
+ ASSERT_EQ(arbitrator->query_id.lo, 2);
+ ASSERT_EQ(arbitrator->query_mem_limit, query_mem_limit);
+ ASSERT_EQ(arbitrator->mem_limit, static_cast<int64_t>(query_mem_limit *
max_scan_ratio));
+ ASSERT_EQ(arbitrator->total_mem_bytes.load(), 0);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_register_scan_node) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ double max_scan_ratio = 0.3;
+
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, max_scan_ratio);
+
+ arbitrator->register_scan_node();
+ ASSERT_EQ(arbitrator->total_mem_bytes.load(), 64 * 1024 * 1024);
+
+ arbitrator->register_scan_node();
+ ASSERT_EQ(arbitrator->total_mem_bytes.load(), 128 * 1024 * 1024);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_update_mem_bytes) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ double max_scan_ratio = 0.3;
+
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, max_scan_ratio);
+
+ int64_t new_limit = arbitrator->update_mem_bytes(0, 100 * 1024 * 1024);
+ ASSERT_EQ(arbitrator->total_mem_bytes.load(), 100 * 1024 * 1024);
+ ASSERT_GT(new_limit, 0);
+
+ new_limit = arbitrator->update_mem_bytes(100 * 1024 * 1024, 0);
+ ASSERT_EQ(new_limit, 0);
+ ASSERT_EQ(arbitrator->total_mem_bytes.load(), 0);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_proportional_sharing) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ double max_scan_ratio = 0.5;
+
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, max_scan_ratio);
+
+ int64_t limit1 = arbitrator->update_mem_bytes(0, 200 * 1024 * 1024);
+ int64_t limit2 = arbitrator->update_mem_bytes(0, 300 * 1024 * 1024);
+
+ ASSERT_LT(limit2, limit1);
+ ASSERT_EQ(arbitrator->total_mem_bytes.load(), 500 * 1024 * 1024);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_share_arbitrator_zero_ratio) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ double max_scan_ratio = 0.0;
+
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, max_scan_ratio);
+
+ ASSERT_GE(arbitrator->mem_limit, 1);
+}
+
+// ==================== MemLimiter Tests ====================
+TEST_F(ScannerContextTest, scanner_mem_limiter_basic) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 0);
+ ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), 0);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_reestimated_block_mem_bytes) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
+ ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024);
+
+ limiter->reestimated_block_mem_bytes(200 * 1024 * 1024);
+ ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 150 * 1024 * 1024);
+
+ limiter->reestimated_block_mem_bytes(300 * 1024 * 1024);
+ ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 200 * 1024 * 1024);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_reestimated_zero_value) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
+ ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024);
+
+ limiter->reestimated_block_mem_bytes(0);
+ ASSERT_EQ(limiter->get_estimated_block_mem_bytes(), 100 * 1024 * 1024);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_available_scanner_count) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ limiter->update_mem_limit(400 * 1024 * 1024);
+ limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
+
+ int count = limiter->available_scanner_count(0);
+ ASSERT_GE(count, 1);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_serial_scan) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = true;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ limiter->update_mem_limit(400 * 1024 * 1024);
+ limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
+
+ int count = limiter->available_scanner_count(0);
+ ASSERT_GE(count, 1);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_update_running_tasks_count) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ ASSERT_EQ(limiter->update_running_tasks_count(5), 5);
+ ASSERT_EQ(limiter->update_running_tasks_count(-2), 3);
+ ASSERT_EQ(limiter->update_running_tasks_count(1), 4);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_update_open_tasks_count) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ ASSERT_EQ(limiter->update_open_tasks_count(1), 0);
+ ASSERT_EQ(limiter->update_open_tasks_count(1), 1);
+ ASSERT_EQ(limiter->update_open_tasks_count(-1), 2);
+ ASSERT_EQ(limiter->update_open_tasks_count(-1), 1);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_update_arb_mem_bytes) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 4;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ limiter->update_arb_mem_bytes(100 * 1024 * 1024);
+ ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), 100 * 1024 * 1024);
+
+ limiter->update_arb_mem_bytes(1024 * 1024 * 1024);
+ ASSERT_EQ(limiter->get_arb_scanner_mem_bytes(), mem_limit);
+}
+
+TEST_F(ScannerContextTest, scanner_mem_limiter_available_count_distribution) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 2;
+ int64_t parallelism = 3;
+ bool serial_scan = false;
+ int64_t mem_limit = 512 * 1024 * 1024;
+
+ auto limiter = MemLimiter::create_shared(query_id, parallelism,
serial_scan, mem_limit);
+
+ limiter->update_mem_limit(500 * 1024 * 1024);
+ limiter->reestimated_block_mem_bytes(100 * 1024 * 1024);
+
+ int count0 = limiter->available_scanner_count(0);
+ int count1 = limiter->available_scanner_count(1);
+ int count2 = limiter->available_scanner_count(2);
+
+ ASSERT_GE(count0, 1);
+ ASSERT_GE(count1, 1);
+ ASSERT_GE(count2, 1);
+}
+
+// ==================== ScannerContext with Memory Control Tests
====================
+TEST_F(ScannerContextTest, scanner_context_with_adaptive_memory) {
+ const int parallel_tasks = 2;
+ auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(),
tnode, 0, *descs,
+ parallel_tasks,
TQueryCacheParam {});
+
+ auto olap_scan_local_state =
+ OlapScanLocalState::create_unique(state.get(),
scan_operator.get());
+
+ const int64_t limit = 100;
+
+ OlapScanner::Params scanner_params;
+ scanner_params.state = state.get();
+ scanner_params.profile = profile.get();
+ scanner_params.limit = limit;
+ scanner_params.key_ranges = std::vector<OlapScanRange*>();
+
+ std::shared_ptr<Scanner> scanner =
+ OlapScanner::create_shared(olap_scan_local_state.get(),
std::move(scanner_params));
+
+ std::list<std::shared_ptr<ScannerDelegate>> scanners;
+ for (int i = 0; i < 5; ++i) {
+ scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+ }
+
+ TUniqueId query_id = state->get_query_ctx()->query_id();
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, 0.3);
+ auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
+
static_cast<int64_t>(query_mem_limit * 0.3));
+
+ std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
+ state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
+ scanners, limit, scan_dependency, &shared_limit, arbitrator,
limiter, 0, true,
+ parallel_tasks);
+
+ limiter->update_open_tasks_count(1);
+ ASSERT_TRUE(scanner_context->_enable_adaptive_scanners);
+ ASSERT_NE(scanner_context->_mem_share_arb, nullptr);
+ ASSERT_NE(scanner_context->_scanner_mem_limiter, nullptr);
+}
+
+TEST_F(ScannerContextTest, scanner_context_adjust_scan_mem_limit) {
+ const int parallel_tasks = 2;
+ auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(),
tnode, 0, *descs,
+ parallel_tasks,
TQueryCacheParam {});
+
+ auto olap_scan_local_state =
+ OlapScanLocalState::create_unique(state.get(),
scan_operator.get());
+
+ const int64_t limit = 100;
+
+ OlapScanner::Params scanner_params;
+ scanner_params.state = state.get();
+ scanner_params.profile = profile.get();
+ scanner_params.limit = limit;
+ scanner_params.key_ranges = std::vector<OlapScanRange*>();
+
+ std::shared_ptr<Scanner> scanner =
+ OlapScanner::create_shared(olap_scan_local_state.get(),
std::move(scanner_params));
+
+ std::list<std::shared_ptr<ScannerDelegate>> scanners;
+ for (int i = 0; i < 5; ++i) {
+ scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+ }
+
+ TUniqueId query_id = state->get_query_ctx()->query_id();
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, 0.3);
+ auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
+
static_cast<int64_t>(query_mem_limit * 0.3));
+
+ std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
+ state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
+ scanners, limit, scan_dependency, &shared_limit, arbitrator,
limiter, 0, true,
+ parallel_tasks);
+
+ int64_t old_mem = 100 * 1024 * 1024;
+ int64_t new_mem = 200 * 1024 * 1024;
+ scanner_context->_adjust_scan_mem_limit(old_mem, new_mem);
+
+ limiter->update_open_tasks_count(1);
+ ASSERT_GT(arbitrator->total_mem_bytes.load(), 0);
+}
+
+TEST_F(ScannerContextTest, scanner_context_reestimated_block_mem_bytes) {
+ const int parallel_tasks = 2;
+ auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(),
tnode, 0, *descs,
+ parallel_tasks,
TQueryCacheParam {});
+
+ auto olap_scan_local_state =
+ OlapScanLocalState::create_unique(state.get(),
scan_operator.get());
+
+ const int64_t limit = 100;
+
+ OlapScanner::Params scanner_params;
+ scanner_params.state = state.get();
+ scanner_params.profile = profile.get();
+ scanner_params.limit = limit;
+ scanner_params.key_ranges = std::vector<OlapScanRange*>();
+
+ std::shared_ptr<Scanner> scanner =
+ OlapScanner::create_shared(olap_scan_local_state.get(),
std::move(scanner_params));
+
+ std::list<std::shared_ptr<ScannerDelegate>> scanners;
+ for (int i = 0; i < 5; ++i) {
+ scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+ }
+
+ TUniqueId query_id = state->get_query_ctx()->query_id();
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, 0.3);
+ auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
+
static_cast<int64_t>(query_mem_limit * 0.3));
+
+ std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
+ state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
+ scanners, limit, scan_dependency, &shared_limit, arbitrator,
limiter, 0, true,
+ parallel_tasks);
+
+ scanner_context->reestimated_block_mem_bytes(150 * 1024 * 1024);
+ ASSERT_GT(limiter->get_estimated_block_mem_bytes(), 0);
+ limiter->update_open_tasks_count(1);
+}
+
+TEST_F(ScannerContextTest, scanner_context_update_peak_running_scanner) {
+ const int parallel_tasks = 2;
+ auto scan_operator = std::make_unique<OlapScanOperatorX>(obj_pool.get(),
tnode, 0, *descs,
+ parallel_tasks,
TQueryCacheParam {});
+
+ auto olap_scan_local_state =
+ OlapScanLocalState::create_unique(state.get(),
scan_operator.get());
+ olap_scan_local_state->_parent = scan_operator.get();
+
+ const int64_t limit = 100;
+
+ OlapScanner::Params scanner_params;
+ scanner_params.state = state.get();
+ scanner_params.profile = profile.get();
+ scanner_params.limit = limit;
+ scanner_params.key_ranges = std::vector<OlapScanRange*>();
+
+ std::shared_ptr<Scanner> scanner =
+ OlapScanner::create_shared(olap_scan_local_state.get(),
std::move(scanner_params));
+
+ std::list<std::shared_ptr<ScannerDelegate>> scanners;
+ for (int i = 0; i < 5; ++i) {
+ scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+ }
+
+ TUniqueId query_id = state->get_query_ctx()->query_id();
+ int64_t query_mem_limit = 1024 * 1024 * 1024;
+ auto arbitrator = MemShareArbitrator::create_shared(query_id,
query_mem_limit, 0.3);
+ auto limiter = MemLimiter::create_shared(query_id, parallel_tasks, false,
+
static_cast<int64_t>(query_mem_limit * 0.3));
+
+ std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
+ state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
+ scanners, limit, scan_dependency, &shared_limit, arbitrator,
limiter, 0, true,
+ parallel_tasks);
+
+ scanner_context->update_peak_running_scanner(3);
+ ASSERT_EQ(limiter->update_running_tasks_count(0), 3);
+ limiter->update_open_tasks_count(1);
+}
+
} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8345f430c90..6598a2e5f9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -90,6 +90,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final List<Field> affectQueryResultFields;
public static final List<Field> affectQueryResultInPlanFields;
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
+ public static final String MAX_SCAN_MEM_RATIO = "max_scan_mem_ratio";
+ public static final String ENABLE_ADAPTIVE_SCAN = "enable_adaptive_scan";
public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT =
"local_exchange_free_blocks_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
public static final String MAX_SCANNERS_CONCURRENCY =
"max_scanners_concurrency";
@@ -1056,6 +1058,10 @@ public class SessionVariable implements Serializable,
Writable {
// max memory used on every backend. Default value to 100G.
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
public long maxExecMemByte = 100147483648L;
+ @VariableMgr.VarAttr(name = MAX_SCAN_MEM_RATIO, needForward = true)
+ public double maxScanMemRatio = 0.3;
+ @VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_SCAN, needForward = true)
+ public boolean enableAdaptiveScan = true;
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, needForward = true,
description = {"ζ―δΈͺ Scan Instance η block queue θ½ε€δΏεε€ε°εθη block",
@@ -5302,6 +5308,8 @@ public class SessionVariable implements Serializable,
Writable {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
+ tResult.setMaxScanMemRatio(maxScanMemRatio);
+ tResult.setEnableAdaptiveScan(enableAdaptiveScan);
tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
tResult.setMaxScannersConcurrency(maxScannersConcurrency);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index c33c7565a27..f50bf0c68da 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -439,8 +439,6 @@ struct TQueryOptions {
200: optional bool enable_adjust_conjunct_order_by_cost;
// Use paimon-cpp to read Paimon splits on BE
201: optional bool enable_paimon_cpp_reader = false;
-
-
// Whether all fragments of this query are assigned to a single backend.
// When true, the streaming aggregation operator can use more aggressive
// hash table expansion thresholds since all data is local.
@@ -473,6 +471,8 @@ struct TQueryOptions {
209: optional i32 spill_repartition_max_depth = 8
+ 210: optional double max_scan_mem_ratio = 0.3;
+ 211: optional bool enable_adaptive_scan = false;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
diff --git a/regression-test/data/mv_p0/test_casewhen/test_casewhen.out
b/regression-test/data/mv_p0/test_casewhen/test_casewhen.out
index fdf4432d537..4f9aa5b7600 100644
--- a/regression-test/data/mv_p0/test_casewhen/test_casewhen.out
+++ b/regression-test/data/mv_p0/test_casewhen/test_casewhen.out
@@ -1,8 +1,8 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
-- !select_star --
1 1 1 2020-02-02 1
-1 1 1 2020-02-02 11
1 1 1 2020-02-02 1
+1 1 1 2020-02-02 11
1 2 2 2020-02-02 1
-- !select_mv --
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
index 44b849dedc0..330c7552341 100644
---
a/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
+++
b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.out
@@ -15,12 +15,12 @@
1 20 1 88 1 0 20 4
1 20 99 88 1 0 20 4
2 2 2 2 2 0 2 2
-2 10 99 2 2 0 10 3
2 10 2 88 2 0 10 4
+2 10 99 2 2 0 10 3
2 10 99 88 2 0 10 4
3 3 3 3 3 0 3 2
-3 10 99 3 3 0 10 3
3 5 3 88 3 0 5 4
+3 10 99 3 3 0 10 3
-- !seq_map_2 --
1 20 99 88 33
@@ -29,23 +29,23 @@
-- !inspect --
1 1 1 1 1 0 1 2
+1 9 77 \N \N 0 9 5
1 10 99 1 1 0 10 3
1 20 1 88 1 0 20 4
1 20 99 88 1 0 20 4
-1 9 77 \N \N 0 9 5
1 20 99 88 33 0 20 6
2 2 2 2 2 0 2 2
-2 10 99 2 2 0 10 3
2 10 2 88 2 0 10 4
-2 10 99 88 2 0 10 4
2 10 77 88 2 0 10 5
-2 10 99 88 33 0 10 6
2 10 77 88 33 0 10 6
+2 10 99 2 2 0 10 3
+2 10 99 88 2 0 10 4
+2 10 99 88 33 0 10 6
3 3 3 3 3 0 3 2
-3 10 99 3 3 0 10 3
3 5 3 88 3 0 5 4
-3 50 77 3 3 0 50 5
+3 10 99 3 3 0 10 3
3 10 99 3 33 0 10 6
+3 50 77 3 3 0 50 5
3 50 77 3 33 0 50 6
-- !seq_map_3 --
@@ -53,46 +53,46 @@
-- !inspect --
1 1 1 1 1 0 1 2
+1 9 77 \N \N 0 9 5
1 10 99 1 1 0 10 3
-1 20 99 88 1 0 20 4
1 20 1 88 1 0 20 4
-1 9 77 \N \N 0 9 5
+1 20 99 88 1 0 20 4
1 20 99 88 33 0 20 6
1 80 66 88 33 0 80 7
1 100 66 88 33 1 100 8
1 100 99 88 33 1 100 8
2 2 2 2 2 0 2 2
-2 10 99 2 2 0 10 3
-2 10 99 88 2 0 10 4
2 10 2 88 2 0 10 4
2 10 77 88 2 0 10 5
-2 10 99 88 33 0 10 6
2 10 77 88 33 0 10 6
+2 10 99 2 2 0 10 3
+2 10 99 88 2 0 10 4
+2 10 99 88 33 0 10 6
2 100 66 88 33 0 100 7
2 100 66 88 33 1 100 8
2 100 77 88 33 1 100 8
3 3 3 3 3 0 3 2
-3 10 99 3 3 0 10 3
3 5 3 88 3 0 5 4
-3 50 77 3 3 0 50 5
+3 10 99 3 3 0 10 3
3 10 99 3 33 0 10 6
+3 50 77 3 3 0 50 5
3 50 77 3 33 0 50 6
-3 120 66 3 33 0 120 7
3 100 77 3 33 1 100 8
+3 120 66 3 33 0 120 7
-- !seq_map_4 --
-- !inspect --
1 10 1 1 1 0 10 2
-1 20 55 1 1 0 20 3
1 10 1 1 100 1 10 4
+1 20 55 1 1 0 20 3
1 20 55 1 100 1 20 4
2 10 2 2 2 0 10 2
-2 100 55 2 2 0 100 3
2 10 2 2 100 1 10 4
+2 100 55 2 2 0 100 3
2 100 55 2 100 1 100 4
3 10 3 3 3 0 10 2
-3 120 55 3 3 0 120 3
3 10 3 3 100 1 10 4
+3 120 55 3 3 0 120 3
3 120 55 3 100 1 120 4
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out
index 6dc2e7e3b3a..6d1db50dbdf 100644
---
a/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.out
@@ -12,19 +12,19 @@
4 \N 987 77777 1234 \N 100
-- !inspect --
+1 \N 987 77777 1234 \N 100 5 0
1 100 1 1 1 1 100 2 0
1 100 1 1 1 1 100 3 1
1 100 987 77777 1 1 100 5 0
-1 \N 987 77777 1234 \N 100 5 0
+2 \N 987 77777 1234 \N 200 5 0
2 100 2 2 2 2 100 2 0
-2 200 2 2 2 2 200 3 1
2 100 987 77777 2 2 100 5 0
-2 \N 987 77777 1234 \N 200 5 0
-3 100 3 3 3 3 100 2 0
+2 200 2 2 2 2 200 3 1
3 50 \N 9876 1234 \N 50 3 1
+3 100 3 3 3 3 100 2 0
3 100 987 77777 3 3 100 5 0
+4 \N 987 77777 1234 \N 100 5 0
4 100 4 4 4 4 100 2 0
4 100 4 4 4 4 100 4 1
4 100 987 77777 4 4 100 5 0
-4 \N 987 77777 1234 \N 100 5 0
diff --git a/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy
b/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy
index 7e63e43bd2c..4750428b119 100644
--- a/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy
+++ b/regression-test/suites/mv_p0/test_casewhen/test_casewhen.groovy
@@ -37,7 +37,7 @@ suite ("test_casewhen") {
sql """analyze table sales_records with sync;"""
sql """alter table sales_records modify column record_id set stats
('row_count'='4');"""
- qt_select_star "select * from sales_records order by 1,2;"
+ qt_select_star "select * from sales_records order by 1,2,3,4,5;"
mv_rewrite_success("select store_id, sum(case when sale_amt>10 then 1 else
2 end) from sales_records group by store_id order by 1;", "store_amt")
qt_select_mv "select store_id, sum(case when sale_amt>10 then 1 else 2
end) from sales_records group by store_id order by 1;"
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
index 042c8276777..ea7c5dfdc3a 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_seq.groovy
@@ -117,7 +117,7 @@ suite("test_partial_update_publish_seq") {
wait_for_publish(txn2, 10)
qt_seq_map_1 "select * from ${table1} order by k1;"
- inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,__DORIS_VERSION_COL__;"
+ inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;"
// without seq map val, the filled seq val >/=/< conflicting seq val
def txn3 = load_data("k1,c1,c2", "1,9,77\n2,10,77\n3,50,77\n")
@@ -127,7 +127,7 @@ suite("test_partial_update_publish_seq") {
do_streamload_2pc_commit(txn4)
wait_for_publish(txn4, 10)
qt_seq_map_2 "select * from ${table1} order by k1;"
- inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,__DORIS_VERSION_COL__;"
+ inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;"
// with delete sign and seq col val, >/=/< conflicting seq val
def txn5 = load_data("k1,c1,c2", "1,80,66\n2,100,66\n3,120,66\n")
@@ -138,7 +138,7 @@ suite("test_partial_update_publish_seq") {
wait_for_publish(txn6, 10)
qt_seq_map_3 "select * from ${table1} order by k1;"
- inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,__DORIS_VERSION_COL__;"
+ inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;"
sql "truncate table ${table1};"
@@ -154,5 +154,5 @@ suite("test_partial_update_publish_seq") {
wait_for_publish(txn8, 10)
qt_seq_map_4 "select * from ${table1} order by k1;"
- inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,__DORIS_VERSION_COL__;"
+ inspect_rows "select
*,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__ from
${table1} order by k1,c1,c2,c3,c4,__DORIS_VERSION_COL__;"
}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy
index 81423436c06..f1a6ea813e8 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_p_seq_publish_read_from_old.groovy
@@ -162,6 +162,6 @@ suite("test_p_seq_publish_read_from_old") {
wait_for_publish(txnId2, 60)
sql "sync;"
- qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__ from ${tableName}
order by k;"
- inspectRows "select
k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__
from ${tableName} order by k,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__;"
+ qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__ from ${tableName}
order by k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__;"
+ inspectRows "select
k,v1,v2,v3,v4,v5,__DORIS_SEQUENCE_COL__,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__
from ${tableName} order by
k,v1,v2,v3,v4,v5,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]