Gabriel39 commented on code in PR #61271:
URL: https://github.com/apache/doris/pull/61271#discussion_r3014021387
##########
be/src/exec/scan/scanner_scheduler.cpp:
##########
@@ -217,113 +211,72 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
}
- size_t raw_bytes_read = 0;
- bool first_read = true; int64_t limit = scanner->limit();
- // If the first block is full, then it is true. Or the first block
+ second block > batch_size
- bool has_first_full_block = false;
-
- // During low memory mode, every scan task will return at most 2
block to reduce memory usage.
- while (!eos && raw_bytes_read < raw_bytes_threshold &&
- (!ctx->low_memory_mode() || !has_first_full_block) &&
- (!has_first_full_block || doris::thread_context()
-
->thread_mem_tracker_mgr->limiter_mem_tracker()
- ->check_limit(1))) {
- if (UNLIKELY(ctx->done())) {
- eos = true;
- break;
- }
- if (max_run_time_watch.elapsed_time() >
- config::doris_scanner_max_run_time_ms * 1e6) {
- break;
- }
- DEFER_RELEASE_RESERVED();
- BlockUPtr free_block;
- if (first_read) {
- free_block = ctx->get_free_block(first_read);
- } else {
- if (state->get_query_ctx()
- ->resource_ctx()
- ->task_controller()
- ->is_enable_reserve_memory()) {
- size_t block_avg_bytes =
scanner->get_block_avg_bytes();
- auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(
- block_avg_bytes);
- if (!st.ok()) {
- handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
- break;
+ bool first_read = true;
+ int64_t limit = scanner->limit();
+ if (UNLIKELY(ctx->done())) { eos = true; } else if (!eos) {
+ do {
+ DEFER_RELEASE_RESERVED();
+ BlockUPtr free_block;
+ if (first_read) {
+ free_block = ctx->get_free_block(first_read);
+ } else {
+ if (state->get_query_ctx()
+ ->resource_ctx()
+ ->task_controller()
+ ->is_enable_reserve_memory()) {
+ size_t block_avg_bytes =
scanner->get_block_avg_bytes();
+ auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(
+ block_avg_bytes);
+ if (!st.ok()) {
+ handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
+ break;
+ }
}
+ free_block = ctx->get_free_block(first_read);
}
- free_block = ctx->get_free_block(first_read);
- }
- if (free_block == nullptr) {
- break;
- }
- // We got a new created block or a reused block.
- status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
- first_read = false;
- if (!status.ok()) {
- LOG(WARNING) << "Scan thread read Scanner failed: " <<
status.to_string();
- break;
- }
- // Check column type only after block is read successfully.
- // Or it may cause a crash when the block is not normal.
- _make_sure_virtual_col_is_materialized(scanner,
free_block.get());
- // Projection will truncate useless columns, makes block size
change.
- auto free_block_bytes = free_block->allocated_bytes();
- raw_bytes_read += free_block_bytes;
- if (!scan_task->cached_blocks.empty() &&
- scan_task->cached_blocks.back().first->rows() +
free_block->rows() <=
- ctx->batch_size()) {
- size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
- MutableBlock
mutable_block(scan_task->cached_blocks.back().first.get());
- status = mutable_block.merge(*free_block);
- if (!status.ok()) {
- LOG(WARNING) << "Block merge failed: " <<
status.to_string();
+ if (free_block == nullptr) {
break;
}
- scan_task->cached_blocks.back().second =
mutable_block.allocated_bytes();
- scan_task->cached_blocks.back().first.get()->set_columns(
- std::move(mutable_block.mutable_columns()));
-
- // Return block succeed or not, this free_block is not
used by this scan task any more.
- // If block can be reused, its memory usage will be added
back.
- ctx->return_free_block(std::move(free_block));
-
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
- block_size);
- } else {
- if (!scan_task->cached_blocks.empty()) {
- has_first_full_block = true;
+ // We got a new created block or a reused block.
+ status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
+ first_read = false;
+ if (!status.ok()) {
+ LOG(WARNING) << "Scan thread read Scanner failed: " <<
status.to_string();
+ break;
}
+ // Check column type only after block is read successfully.
+ // Or it may cause a crash when the block is not normal.
+ _make_sure_virtual_col_is_materialized(scanner,
free_block.get());
+ // Projection will truncate useless columns, makes block
size change.
+ auto free_block_bytes = free_block->allocated_bytes();
+
ctx->reestimated_block_mem_bytes(cast_set<int64_t>(free_block_bytes));
+ DCHECK(scan_task->cached_block == nullptr);
ctx->inc_block_usage(free_block->allocated_bytes());
-
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
- }
-
- if (limit > 0 && limit < ctx->batch_size()) {
- // If this scanner has limit, and less than batch size,
- // return immediately and no need to wait
raw_bytes_threshold.
- // This can save time that each scanner may only return a
small number of rows,
- // but rows are enough from all scanners.
- // If not break, the query like "select * from tbl where
id=1 limit 10"
- // may scan a lot data when the "id=1"'s filter ratio is
high.
- // If limit is larger than batch size, this rule is
skipped,
- // to avoid user specify a large limit and causing too
much small blocks.
- break;
- }
+ scan_task->cached_block = std::move(free_block);
+
+ if (limit > 0 && limit < ctx->batch_size()) {
+ // If this scanner has limit, and less than batch size,
+ // return immediately and no need to wait
raw_bytes_threshold.
+ // This can save time that each scanner may only
return a small number of rows,
+ // but rows are enough from all scanners.
+ // If not break, the query like "select * from tbl
where id=1 limit 10"
+ // may scan a lot data when the "id=1"'s filter ratio
is high.
+ // If limit is larger than batch size, this rule is
skipped,
+ // to avoid user specify a large limit and causing too
much small blocks.
+ break;
+ }
- if (scan_task->cached_blocks.back().first->rows() > 0) {
- auto block_avg_bytes =
(scan_task->cached_blocks.back().first->bytes() +
-
scan_task->cached_blocks.back().first->rows() - 1) /
-
scan_task->cached_blocks.back().first->rows() *
- ctx->batch_size();
- scanner->update_block_avg_bytes(block_avg_bytes);
- }
- if (ctx->low_memory_mode()) {
- ctx->clear_free_blocks();
- if (raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
- raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ if (scan_task->cached_block->rows() > 0) {
+ auto block_avg_bytes =
(scan_task->cached_block->bytes() +
+
scan_task->cached_block->rows() - 1) /
+ scan_task->cached_block->rows()
* ctx->batch_size();
+ scanner->update_block_avg_bytes(block_avg_bytes);
}
- }
- } // end for while
+ if (ctx->low_memory_mode()) {
+ ctx->clear_free_blocks();
+ }
+ } while (false);
+ }
Review Comment:
This is alright because we need memory updating after each block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]