github-actions[bot] commented on code in PR #61271:
URL: https://github.com/apache/doris/pull/61271#discussion_r3012966300
##########
be/src/exec/scan/scanner_context.h:
##########
@@ -52,32 +52,147 @@ class Dependency;
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
-class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
+struct MemLimiter;
+
+// Query-level memory arbitrator that distributes memory fairly across all
scan contexts
+struct MemShareArbitrator {
+ ENABLE_FACTORY_CREATOR(MemShareArbitrator)
+ TUniqueId query_id;
+ int64_t query_mem_limit = 0;
+ int64_t mem_limit = 0;
+ std::atomic<int64_t> total_mem_bytes = 0;
+
+ MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double
max_scan_ratio);
+
+ // Update memory allocation when scanner memory usage changes
+ // Returns new scan memory limit for this context
+ int64_t update_mem_bytes(int64_t old_value, int64_t new_value);
+ void register_scan_node();
+ std::string debug_string() const {
+ return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}",
print_id(query_id),
+ query_mem_limit, mem_limit);
+ }
+};
+
+// Scan-context-level memory limiter that controls scanner concurrency based
on memory
+struct MemLimiter {
+private:
+ TUniqueId query_id;
+ mutable std::mutex lock;
+ // Parallelism of the scan operator
+ const int64_t parallelism = 0;
+ const bool serial_operator = false;
+ const int64_t operator_mem_limit;
+ std::atomic<int64_t> running_tasks_count = 0;
+
+ std::atomic<int64_t> estimated_block_mem_bytes = 0;
+ int64_t estimated_block_mem_bytes_update_count = 0;
+ int64_t arb_mem_bytes = 0;
+ std::atomic<int64_t> open_tasks_count = 0;
Review Comment:
**[Bug: Data Race]** `arb_mem_bytes` is a plain `int64_t` but is accessed
concurrently from multiple threads. Multiple `ScannerContext` instances share
the same `MemLimiter` (one per scan node), and `update_arb_mem_bytes()` /
`get_arb_scanner_mem_bytes()` can be called from different pipeline task
threads simultaneously. The per-context `_transfer_lock` does NOT protect the
shared `MemLimiter` — each context has its own lock.
This should be `std::atomic<int64_t>` like the other shared fields
(`running_tasks_count`, `estimated_block_mem_bytes`, `open_tasks_count`,
`mem_limit`).
##########
be/src/exec/scan/scanner_context.cpp:
##########
@@ -181,13 +409,22 @@ Status ScannerContext::init() {
ScannerContext::~ScannerContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
- _tasks_queue.clear();
+ _completed_tasks.clear();
BlockUPtr block;
while (_free_blocks.try_dequeue(block)) {
// do nothing
}
block.reset();
DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
+
+ // Cleanup memory limiter if last context closing
+ if (_enable_adaptive_scanners) {
+ if (_scanner_mem_limiter->update_open_tasks_count(-1) == 1) {
Review Comment:
**[Bug: Lifecycle - related to MemLimiter destructor DCHECK]** This is the
destructor counterpart of the lifecycle bug. If `init()` failed before calling
`update_open_tasks_count(1)`, this `update_open_tasks_count(-1)` will drive
`open_tasks_count` to -1, triggering the `DCHECK_EQ(open_tasks_count, 0)` in
`~MemLimiter()`.
Also note: `update_open_tasks_count(-1)` returns the *previous* value via
`fetch_add`. So the check `== 1` means "I was the last one" — but if the count
went negative, this comparison wouldn't catch it. Consider adding a
`DCHECK(open_tasks_count >= 0)` after the decrement as a safety net.
##########
be/src/common/config.h:
##########
@@ -402,8 +402,9 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
-// single read execute fragment max run time millseconds
+// Deprecated. single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
+DECLARE_mInt32(doris_scanner_dynamic_interval_ms);
// (Advanced) Maximum size of per-query receive-side buffer
Review Comment:
**[Minor]** `doris_scanner_dynamic_interval_ms` is declared without a
description comment, unlike all neighboring config declarations which have
comments explaining their purpose. Please add a comment like:
```cpp
// Minimum interval in milliseconds between adaptive scanner concurrency
adjustments
DECLARE_mInt32(doris_scanner_dynamic_interval_ms);
```
##########
be/src/exec/scan/scanner_scheduler.cpp:
##########
@@ -217,113 +211,72 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
}
- size_t raw_bytes_read = 0;
- bool first_read = true; int64_t limit = scanner->limit();
- // If the first block is full, then it is true. Or the first block
+ second block > batch_size
- bool has_first_full_block = false;
-
- // During low memory mode, every scan task will return at most 2
block to reduce memory usage.
- while (!eos && raw_bytes_read < raw_bytes_threshold &&
- (!ctx->low_memory_mode() || !has_first_full_block) &&
- (!has_first_full_block || doris::thread_context()
-
->thread_mem_tracker_mgr->limiter_mem_tracker()
- ->check_limit(1))) {
- if (UNLIKELY(ctx->done())) {
- eos = true;
- break;
- }
- if (max_run_time_watch.elapsed_time() >
- config::doris_scanner_max_run_time_ms * 1e6) {
- break;
- }
- DEFER_RELEASE_RESERVED();
- BlockUPtr free_block;
- if (first_read) {
- free_block = ctx->get_free_block(first_read);
- } else {
- if (state->get_query_ctx()
- ->resource_ctx()
- ->task_controller()
- ->is_enable_reserve_memory()) {
- size_t block_avg_bytes =
scanner->get_block_avg_bytes();
- auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(
- block_avg_bytes);
- if (!st.ok()) {
- handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
- break;
+ bool first_read = true;
+ int64_t limit = scanner->limit();
+ if (UNLIKELY(ctx->done())) { eos = true; } else if (!eos) {
+ do {
+ DEFER_RELEASE_RESERVED();
+ BlockUPtr free_block;
+ if (first_read) {
+ free_block = ctx->get_free_block(first_read);
+ } else {
+ if (state->get_query_ctx()
+ ->resource_ctx()
+ ->task_controller()
+ ->is_enable_reserve_memory()) {
+ size_t block_avg_bytes =
scanner->get_block_avg_bytes();
+ auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(
+ block_avg_bytes);
+ if (!st.ok()) {
+ handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
+ break;
+ }
}
+ free_block = ctx->get_free_block(first_read);
}
- free_block = ctx->get_free_block(first_read);
- }
- if (free_block == nullptr) {
- break;
- }
- // We got a new created block or a reused block.
- status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
- first_read = false;
- if (!status.ok()) {
- LOG(WARNING) << "Scan thread read Scanner failed: " <<
status.to_string();
- break;
- }
- // Check column type only after block is read successfully.
- // Or it may cause a crash when the block is not normal.
- _make_sure_virtual_col_is_materialized(scanner,
free_block.get());
- // Projection will truncate useless columns, makes block size
change.
- auto free_block_bytes = free_block->allocated_bytes();
- raw_bytes_read += free_block_bytes;
- if (!scan_task->cached_blocks.empty() &&
- scan_task->cached_blocks.back().first->rows() +
free_block->rows() <=
- ctx->batch_size()) {
- size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
- MutableBlock
mutable_block(scan_task->cached_blocks.back().first.get());
- status = mutable_block.merge(*free_block);
- if (!status.ok()) {
- LOG(WARNING) << "Block merge failed: " <<
status.to_string();
+ if (free_block == nullptr) {
break;
}
- scan_task->cached_blocks.back().second =
mutable_block.allocated_bytes();
- scan_task->cached_blocks.back().first.get()->set_columns(
- std::move(mutable_block.mutable_columns()));
-
- // Return block succeed or not, this free_block is not
used by this scan task any more.
- // If block can be reused, its memory usage will be added
back.
- ctx->return_free_block(std::move(free_block));
-
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
- block_size);
- } else {
- if (!scan_task->cached_blocks.empty()) {
- has_first_full_block = true;
+ // We got a new created block or a reused block.
+ status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
+ first_read = false;
+ if (!status.ok()) {
+ LOG(WARNING) << "Scan thread read Scanner failed: " <<
status.to_string();
+ break;
}
+ // Check column type only after block is read successfully.
+ // Or it may cause a crash when the block is not normal.
+ _make_sure_virtual_col_is_materialized(scanner,
free_block.get());
+ // Projection will truncate useless columns, makes block
size change.
+ auto free_block_bytes = free_block->allocated_bytes();
+
ctx->reestimated_block_mem_bytes(cast_set<int64_t>(free_block_bytes));
+ DCHECK(scan_task->cached_block == nullptr);
ctx->inc_block_usage(free_block->allocated_bytes());
-
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
- }
-
- if (limit > 0 && limit < ctx->batch_size()) {
- // If this scanner has limit, and less than batch size,
- // return immediately and no need to wait
raw_bytes_threshold.
- // This can save time that each scanner may only return a
small number of rows,
- // but rows are enough from all scanners.
- // If not break, the query like "select * from tbl where
id=1 limit 10"
- // may scan a lot data when the "id=1"'s filter ratio is
high.
- // If limit is larger than batch size, this rule is
skipped,
- // to avoid user specify a large limit and causing too
much small blocks.
- break;
- }
+ scan_task->cached_block = std::move(free_block);
+
+ if (limit > 0 && limit < ctx->batch_size()) {
+ // If this scanner has limit, and less than batch size,
+ // return immediately and no need to wait
raw_bytes_threshold.
+ // This can save time that each scanner may only
return a small number of rows,
+ // but rows are enough from all scanners.
+ // If not break, the query like "select * from tbl
where id=1 limit 10"
+ // may scan a lot data when the "id=1"'s filter ratio
is high.
+ // If limit is larger than batch size, this rule is
skipped,
+ // to avoid user specify a large limit and causing too
much small blocks.
+ break;
+ }
- if (scan_task->cached_blocks.back().first->rows() > 0) {
- auto block_avg_bytes =
(scan_task->cached_blocks.back().first->bytes() +
-
scan_task->cached_blocks.back().first->rows() - 1) /
-
scan_task->cached_blocks.back().first->rows() *
- ctx->batch_size();
- scanner->update_block_avg_bytes(block_avg_bytes);
- }
- if (ctx->low_memory_mode()) {
- ctx->clear_free_blocks();
- if (raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
- raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ if (scan_task->cached_block->rows() > 0) {
+ auto block_avg_bytes =
(scan_task->cached_block->bytes() +
+
scan_task->cached_block->rows() - 1) /
+ scan_task->cached_block->rows()
* ctx->batch_size();
+ scanner->update_block_avg_bytes(block_avg_bytes);
}
- }
- } // end for while
+ if (ctx->low_memory_mode()) {
+ ctx->clear_free_blocks();
+ }
+ } while (false);
+ }
Review Comment:
**[Performance / Behavioral Change]** This `do { ... } while(false)`
effectively makes each scanner task read exactly ONE block instead of batching
multiple blocks. This is a significant behavioral change:
1. **Scheduling overhead**: For fast local scans (SSD/memory), the per-task
scheduling cost (thread pool dispatch, lock acquisition, state transitions) is
no longer amortized across multiple blocks.
2. **Dead code**: `raw_bytes_threshold`, the `first_read` else-branch with
memory reservation, and the low-memory multi-block logic are now unreachable.
3. **Block merging removed**: Small blocks are no longer merged together,
which may increase downstream overhead.
The TODO comment acknowledges this is intentional for memory-aware
scheduling, but the performance regression for fast scans should be measured.
Consider keeping multi-block batching as a fallback when adaptive scheduling is
disabled (`_enable_adaptive_scanners == false`).
##########
be/src/exec/scan/scanner_context.h:
##########
@@ -52,32 +52,147 @@ class Dependency;
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
-class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
+struct MemLimiter;
+
+// Query-level memory arbitrator that distributes memory fairly across all
scan contexts
+struct MemShareArbitrator {
+ ENABLE_FACTORY_CREATOR(MemShareArbitrator)
+ TUniqueId query_id;
+ int64_t query_mem_limit = 0;
+ int64_t mem_limit = 0;
+ std::atomic<int64_t> total_mem_bytes = 0;
+
+ MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double
max_scan_ratio);
+
+ // Update memory allocation when scanner memory usage changes
+ // Returns new scan memory limit for this context
+ int64_t update_mem_bytes(int64_t old_value, int64_t new_value);
+ void register_scan_node();
+ std::string debug_string() const {
+ return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}",
print_id(query_id),
+ query_mem_limit, mem_limit);
+ }
+};
+
+// Scan-context-level memory limiter that controls scanner concurrency based
on memory
+struct MemLimiter {
+private:
+ TUniqueId query_id;
+ mutable std::mutex lock;
+ // Parallelism of the scan operator
+ const int64_t parallelism = 0;
+ const bool serial_operator = false;
+ const int64_t operator_mem_limit;
+ std::atomic<int64_t> running_tasks_count = 0;
+
+ std::atomic<int64_t> estimated_block_mem_bytes = 0;
+ int64_t estimated_block_mem_bytes_update_count = 0;
+ int64_t arb_mem_bytes = 0;
+ std::atomic<int64_t> open_tasks_count = 0;
+
+ // Memory limit for this scan node (shared by all instances), updated by
memory share arbitrator
+ std::atomic<int64_t> mem_limit = 0;
+
+public:
+ ENABLE_FACTORY_CREATOR(MemLimiter)
+ MemLimiter(const TUniqueId& qid, int64_t parallelism, bool
serial_operator_, int64_t mem_limit)
+ : query_id(qid),
+ parallelism(parallelism),
+ serial_operator(serial_operator_),
+ operator_mem_limit(mem_limit) {}
+ ~MemLimiter() { DCHECK_EQ(open_tasks_count, 0); }
+
+ // Calculate available scanner count based on memory limit
+ int available_scanner_count(int ins_idx) const;
+
+ int64_t update_running_tasks_count(int delta) { return running_tasks_count
+= delta; }
+
+ // Re-estimated the average memory usage of a block, and update the
estimated_block_mem_bytes accordingly.
+ void reestimated_block_mem_bytes(int64_t value);
+ void update_mem_limit(int64_t value) { mem_limit = value; }
+ void update_arb_mem_bytes(int64_t value) {
+ value = std::min(value, operator_mem_limit);
+ arb_mem_bytes = value;
+ }
+ int64_t get_arb_scanner_mem_bytes() const { return arb_mem_bytes; }
+
+ int64_t get_estimated_block_mem_bytes() const { return
estimated_block_mem_bytes; }
+
+ int64_t update_open_tasks_count(int delta) { return
open_tasks_count.fetch_add(delta); }
+ std::string debug_string() const {
+ return fmt::format(
+ "query_id: {}, parallelism: {}, serial_operator: {},
operator_mem_limit: {}, "
+ "running_tasks_count: {}, estimated_block_mem_bytes: {}, "
+ "estimated_block_mem_bytes_update_count: {}, arb_mem_bytes:
{}, "
+ "open_tasks_count: {}, mem_limit: {}",
+ print_id(query_id), parallelism, serial_operator,
operator_mem_limit,
+ running_tasks_count.load(), estimated_block_mem_bytes.load(),
+ estimated_block_mem_bytes_update_count, arb_mem_bytes,
open_tasks_count, mem_limit);
+ }
+};
+
+// Adaptive processor for dynamic scanner concurrency adjustment
+struct ScannerAdaptiveProcessor {
+ ENABLE_FACTORY_CREATOR(ScannerAdaptiveProcessor)
+ ScannerAdaptiveProcessor() = default;
+ ~ScannerAdaptiveProcessor() = default;
+ // Expected scanners in this cycle
+
+ int expected_scanners = 0;
+ // Timing metrics
+ // int64_t context_start_time = 0;
+ // int64_t scanner_total_halt_time = 0;
Review Comment:
**[Code Hygiene]** The `ScannerAdaptiveProcessor` struct contains ~15 lines
of commented-out fields. Combined with ~85 lines of commented-out code in
`_available_pickup_scanner_count()` (scanner_context.cpp), this totals ~100
lines of dead code. Per the `TODO(gabriel)` comment, this is intended for
future IO-based adaptive scheduling.
Suggestion: Remove the commented-out code and reference a tracking issue
instead. Commented-out code tends to bit-rot and adds noise during code review.
##########
be/src/exec/scan/scanner_context.h:
##########
@@ -52,32 +52,147 @@ class Dependency;
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
-class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
+struct MemLimiter;
+
+// Query-level memory arbitrator that distributes memory fairly across all
scan contexts
+struct MemShareArbitrator {
+ ENABLE_FACTORY_CREATOR(MemShareArbitrator)
+ TUniqueId query_id;
+ int64_t query_mem_limit = 0;
+ int64_t mem_limit = 0;
+ std::atomic<int64_t> total_mem_bytes = 0;
+
+ MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double
max_scan_ratio);
+
+ // Update memory allocation when scanner memory usage changes
+ // Returns new scan memory limit for this context
+ int64_t update_mem_bytes(int64_t old_value, int64_t new_value);
+ void register_scan_node();
+ std::string debug_string() const {
+ return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}",
print_id(query_id),
+ query_mem_limit, mem_limit);
+ }
+};
+
+// Scan-context-level memory limiter that controls scanner concurrency based
on memory
+struct MemLimiter {
+private:
+ TUniqueId query_id;
+ mutable std::mutex lock;
+ // Parallelism of the scan operator
+ const int64_t parallelism = 0;
+ const bool serial_operator = false;
+ const int64_t operator_mem_limit;
+ std::atomic<int64_t> running_tasks_count = 0;
+
+ std::atomic<int64_t> estimated_block_mem_bytes = 0;
+ int64_t estimated_block_mem_bytes_update_count = 0;
+ int64_t arb_mem_bytes = 0;
+ std::atomic<int64_t> open_tasks_count = 0;
+
+ // Memory limit for this scan node (shared by all instances), updated by
memory share arbitrator
+ std::atomic<int64_t> mem_limit = 0;
+
+public:
+ ENABLE_FACTORY_CREATOR(MemLimiter)
+ MemLimiter(const TUniqueId& qid, int64_t parallelism, bool
serial_operator_, int64_t mem_limit)
+ : query_id(qid),
+ parallelism(parallelism),
+ serial_operator(serial_operator_),
+ operator_mem_limit(mem_limit) {}
+ ~MemLimiter() { DCHECK_EQ(open_tasks_count, 0); }
+
Review Comment:
**[Bug: Lifecycle]** The `DCHECK_EQ(open_tasks_count, 0)` in the destructor
will fire if any `ScannerContext::init()` fails after the constructor sets
`_enable_adaptive_scanners = true`.
The issue: `_enable_adaptive_scanners` is set in the constructor
(unconditionally from the parameter), but `update_open_tasks_count(1)` is only
called inside `init()` (at scanner_context.cpp around the `if
(_enable_adaptive_scanners)` block). If `init()` fails before reaching that
point (e.g., `DORIS_TRY` throws), the destructor still calls
`update_open_tasks_count(-1)` because `_enable_adaptive_scanners` is true,
driving `open_tasks_count` to -1.
Suggestion: add an `_adaptive_initialized` flag that is set after the
`update_open_tasks_count(1)` call in `init()`, and check it in the destructor
instead of (or in addition to) `_enable_adaptive_scanners`.
##########
be/src/exec/scan/scanner_context.cpp:
##########
@@ -52,11 +52,82 @@ namespace doris {
using namespace std::chrono_literals;
#include "common/compile_check_begin.h"
+// ==================== MemShareArbitrator ====================
+static constexpr int64_t DEFAULT_SCANNER_MEM_BYTES = 64 * 1024 * 1024; // 64MB
default
+
+MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t
query_mem_limit,
+ double max_scan_ratio)
+ : query_id(qid),
+ query_mem_limit(query_mem_limit),
+ mem_limit(std::max<int64_t>(
+ 1, static_cast<int64_t>(static_cast<double>(query_mem_limit)
* max_scan_ratio))) {
+}
+
+void MemShareArbitrator::register_scan_node() {
+ total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES);
+}
+
+int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t
new_value) {
+ int64_t diff = new_value - old_value;
+ int64_t total = total_mem_bytes.fetch_add(diff) + diff;
+ if (new_value == 0) return 0;
+ if (total <= 0) return mem_limit;
+ // Proportional sharing: allocate based on this context's share of total
usage
+ double ratio = static_cast<double>(new_value) /
static_cast<double>(std::max(total, new_value));
+ return static_cast<int64_t>(static_cast<double>(mem_limit) * ratio);
+}
+
+// ==================== MemLimiter ====================
+int MemLimiter::available_scanner_count(int ins_idx) const {
+ int64_t mem_limit_value = mem_limit.load();
+ int64_t running_tasks_count_value = running_tasks_count.load();
+ int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes();
+
+ int64_t max_count = std::max(1L, mem_limit_value /
estimated_block_mem_bytes_value);
+ int64_t avail_count = max_count;
+ int64_t per_count = avail_count / parallelism;
+ if (serial_operator) {
+ per_count += (avail_count - per_count * parallelism);
+ } else if (ins_idx < avail_count - per_count * parallelism) {
+ per_count += 1;
+ }
+
+ VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "("
+ << running_tasks_count_value << "/" <<
estimated_block_mem_bytes_value
+ << "), operator_mem_limit = " << operator_mem_limit
+ << ", running_tasks_count = " << running_tasks_count_value
+ << ", parallelism = " << parallelism << ", avail_count = " <<
avail_count
+ << ", ins_id = " << ins_idx << ", per_count = " << per_count
+ << " debug_string: " << debug_string();
+
+ return cast_set<int>(per_count);
+}
+
+void MemLimiter::reestimated_block_mem_bytes(int64_t value) {
+ if (value == 0) return;
+ value = std::min(value, operator_mem_limit);
Review Comment:
**[Robustness]** `available_scanner_count()` divides by
`estimated_block_mem_bytes_value` which starts at 0. Currently safe because
`reestimated_block_mem_bytes(DEFAULT_SCANNER_MEM_BYTES)` is always called in
`init()` before this is ever invoked, but there's no defensive guard. Per
AGENTS.md coding standards, a `DCHECK(estimated_block_mem_bytes_value > 0)`
assertion would be appropriate here to catch any future ordering bugs.
##########
be/src/exec/scan/scanner_context.h:
##########
@@ -52,32 +52,147 @@ class Dependency;
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
-class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
+struct MemLimiter;
+
+// Query-level memory arbitrator that distributes memory fairly across all
scan contexts
+struct MemShareArbitrator {
+ ENABLE_FACTORY_CREATOR(MemShareArbitrator)
+ TUniqueId query_id;
+ int64_t query_mem_limit = 0;
+ int64_t mem_limit = 0;
+ std::atomic<int64_t> total_mem_bytes = 0;
+
+ MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double
max_scan_ratio);
+
+ // Update memory allocation when scanner memory usage changes
+ // Returns new scan memory limit for this context
+ int64_t update_mem_bytes(int64_t old_value, int64_t new_value);
+ void register_scan_node();
+ std::string debug_string() const {
+ return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}",
print_id(query_id),
+ query_mem_limit, mem_limit);
+ }
+};
+
+// Scan-context-level memory limiter that controls scanner concurrency based
on memory
+struct MemLimiter {
+private:
+ TUniqueId query_id;
+ mutable std::mutex lock;
+ // Parallelism of the scan operator
+ const int64_t parallelism = 0;
+ const bool serial_operator = false;
+ const int64_t operator_mem_limit;
+ std::atomic<int64_t> running_tasks_count = 0;
+
+ std::atomic<int64_t> estimated_block_mem_bytes = 0;
+ int64_t estimated_block_mem_bytes_update_count = 0;
+ int64_t arb_mem_bytes = 0;
Review Comment:
**[Bug: Data Race]** `estimated_block_mem_bytes_update_count` is a plain
`int64_t`. It is protected by `lock` in `reestimated_block_mem_bytes()`, but
read without any lock in `debug_string()`. Note that `debug_string()` is called
from `available_scanner_count()` via `VLOG_DEBUG` (which does not hold the
lock), and also from VLOG inside `reestimated_block_mem_bytes()` itself (where
the lock is already held — and since it's `std::mutex` not
`std::recursive_mutex`, acquiring it again would deadlock).
Suggestion: make this `std::atomic<int64_t>`, consistent with other fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]