This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7982d9c69098540ec11535b6469f3db708012a67 Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Thu Dec 12 23:06:53 2024 +0800 improve log --- .../exec/partitioned_hash_join_sink_operator.cpp | 38 ++++++++++---------- be/src/pipeline/pipeline_task.cpp | 40 ++++++++++++++-------- be/src/runtime/query_context.cpp | 19 +++++----- 3 files changed, 55 insertions(+), 42 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index abf81a068e7..09e13eb465d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -29,6 +29,7 @@ #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" +#include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" @@ -286,8 +287,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { SCOPED_TIMER(_spill_total_timer); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task: " << state->task_id() - << " hash join sink " << _parent->node_id() << " revoke_memory" + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() + << " sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; DCHECK_EQ(_spilling_task_count, 0); CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); @@ -312,9 +313,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( Status status; if (_child_eos) { - VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << ", hash join sink " - << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state->task_id(); + VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << ", task " + << state->task_id() << " sink " << _parent->node_id() + << " set_ready_to_read"; std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -398,9 +399,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( } if (_child_eos) { - VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", hash join sink " - << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state->task_id(); + VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", task " << state->task_id() + << " sink " << _parent->node_id() << " set_ready_to_read"; std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -587,9 +587,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B const auto need_to_spill = local_state._shared_state->need_to_spill; if (rows == 0) { if (eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash join sink " - << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", need spill: " << need_to_spill; + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() + << " sink " << node_id() << " eos, set_ready_to_read" + << ", need spill: " << need_to_spill; if (need_to_spill) { return revoke_memory(state, nullptr); @@ -605,11 +605,11 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B Defer defer {[&]() { local_state.update_memory_usage(); }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << "hash join sink " - << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", nonspill build usage: " - << _inner_sink_operator->get_memory_usage( - local_state._shared_state->inner_runtime_state.get()); + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " task " + << state->task_id() << " sink " << node_id() << " eos, set_ready_to_read" + << ", nonspill build usage: " + << PrettyPrinter::print_bytes(_inner_sink_operator->get_memory_usage( + local_state._shared_state->inner_runtime_state.get())); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), @@ -646,9 +646,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); if (eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash join sink " - << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", nonspill build usage: " + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " task " << state->task_id() + << " sink " << node_id() << " eos, set_ready_to_read" + << ", nonspill build usage: " << _inner_sink_operator->get_memory_usage( local_state._shared_state->inner_runtime_state.get()); local_state._dependency->set_ready_to_read(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 0ae87287d96..a099ac66421 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -421,13 +421,19 @@ Status PipelineTask::execute(bool* eos) { COUNTER_UPDATE(_memory_reserve_times, 1); if (!st.ok()) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); - VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to reserve: " - << PrettyPrinter::print(reserve_size, TUnit::BYTES) - << ", sink name: " << _sink->get_name() - << ", node id: " << _sink->node_id() - << ", task id: " << _state->task_id() - << ", failed: " << st.to_string() - << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + auto debug_msg = fmt::format( + "Query: {} , try to reserve: {}, operator name: {}, operator id: {}, " + "task id: " + "{}, revocable mem size: {}, failed: {}", + print_id(query_id), PrettyPrinter::print_bytes(reserve_size), + _root->get_name(), _root->node_id(), _state->task_id(), + PrettyPrinter::print_bytes(get_revocable_size()), st.to_string()); + // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str + if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { + debug_msg += fmt::format(", debug info: {}", + GlobalMemoryArbitrator::process_mem_log_str()); + } + LOG(INFO) << debug_msg; _state->get_query_ctx()->update_paused_reason(st); _state->get_query_ctx()->set_low_memory_mode(); @@ -453,13 +459,19 @@ Status PipelineTask::execute(bool* eos) { if (!status.ok()) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); - VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to reserve: " - << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) - << ", sink name: " << _sink->get_name() - << ", node id: " << _sink->node_id() - << ", task id: " << _state->task_id() - << ", failed: " << status.to_string() - << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + auto debug_msg = fmt::format( + "Query: {} try to reserve: {}, sink name: {}, node id: {}, task id: " + "{}, revocable mem size: {}, failed: {}", + print_id(query_id), PrettyPrinter::print_bytes(sink_reserve_size), + _sink->get_name(), _sink->node_id(), _state->task_id(), + PrettyPrinter::print_bytes(get_revocable_size()), status.to_string()); + // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str + if (!status.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { + debug_msg += fmt::format(", debug info: {}", + GlobalMemoryArbitrator::process_mem_log_str()); + } + LOG(INFO) << debug_msg; + DCHECK_EQ(_pending_block.get(), nullptr); _pending_block = std::move(_block); _block = vectorized::Block::create_unique(_pending_block->clone_empty()); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 0b55a1bf305..f69e8efe653 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -476,17 +476,17 @@ Status QueryContext::revoke_memory() { // should free 200MB memory, not 300MB const auto target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2); size_t revoked_size = 0; + size_t total_revokable_size = 0; std::vector<pipeline::PipelineTask*> chosen_tasks; for (auto&& [revocable_size, task] : tasks) { - chosen_tasks.emplace_back(task); - - revoked_size += revocable_size; // Only revoke the largest task to ensure memory is used as much as possible // break; - if (revoked_size >= target_revoking_size) { - break; + if (revoked_size < target_revoking_size) { + chosen_tasks.emplace_back(task); + revoked_size += revocable_size; } + total_revokable_size += revocable_size; } std::weak_ptr<QueryContext> this_ctx = shared_from_this(); @@ -502,13 +502,14 @@ Status QueryContext::revoke_memory() { query_context->set_memory_sufficient(true); }); + LOG(INFO) << fmt::format( + "{}, spill context: {}, revokable mem: {}/{}, tasks count: {}/{}", this->debug_string(), + ((void*)spill_context.get()), PrettyPrinter::print_bytes(revoked_size), + PrettyPrinter::print_bytes(total_revokable_size), chosen_tasks.size(), tasks.size()); + for (auto* task : chosen_tasks) { RETURN_IF_ERROR(task->revoke_memory(spill_context)); } - - LOG(INFO) << this->debug_string() << ", context: " << ((void*)spill_context.get()) - << " total revoked size: " << PrettyPrinter::print(revoked_size, TUnit::BYTES) - << ", tasks count: " << chosen_tasks.size() << "/" << tasks.size(); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org