This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7982d9c69098540ec11535b6469f3db708012a67
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Thu Dec 12 23:06:53 2024 +0800

    improve log
---
 .../exec/partitioned_hash_join_sink_operator.cpp   | 38 ++++++++++----------
 be/src/pipeline/pipeline_task.cpp                  | 40 ++++++++++++++--------
 be/src/runtime/query_context.cpp                   | 19 +++++-----
 3 files changed, 55 insertions(+), 42 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index abf81a068e7..09e13eb465d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -29,6 +29,7 @@
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
+#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
@@ -286,8 +287,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
 Status PartitionedHashJoinSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     SCOPED_TIMER(_spill_total_timer);
-    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task: " << 
state->task_id()
-               << " hash join sink " << _parent->node_id() << " revoke_memory"
+    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << 
state->task_id()
+               << " sink " << _parent->node_id() << " revoke_memory"
                << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_task_count, 0);
     CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
@@ -312,9 +313,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
 
         Status status;
         if (_child_eos) {
-            VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << 
", hash join sink "
-                       << _parent->node_id() << " set_ready_to_read"
-                       << ", task id: " << state->task_id();
+            VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << 
", task "
+                       << state->task_id() << " sink " << _parent->node_id()
+                       << " set_ready_to_read";
             std::for_each(_shared_state->partitioned_build_blocks.begin(),
                           _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
                               if (block) {
@@ -398,9 +399,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     }
 
     if (_child_eos) {
-        VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", hash join 
sink "
-                   << _parent->node_id() << " set_ready_to_read"
-                   << ", task id: " << state->task_id();
+        VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", task " << 
state->task_id()
+                   << " sink " << _parent->node_id() << " set_ready_to_read";
         std::for_each(_shared_state->partitioned_build_blocks.begin(),
                       _shared_state->partitioned_build_blocks.end(), [&](auto& 
block) {
                           if (block) {
@@ -587,9 +587,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     const auto need_to_spill = local_state._shared_state->need_to_spill;
     if (rows == 0) {
         if (eos) {
-            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash 
join sink "
-                       << node_id() << " sink eos, set_ready_to_read"
-                       << ", task id: " << state->task_id() << ", need spill: 
" << need_to_spill;
+            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task 
" << state->task_id()
+                       << " sink " << node_id() << " eos, set_ready_to_read"
+                       << ", need spill: " << need_to_spill;
 
             if (need_to_spill) {
                 return revoke_memory(state, nullptr);
@@ -605,11 +605,11 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                 Defer defer {[&]() { local_state.update_memory_usage(); }};
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
-                VLOG_DEBUG << "Query: " << print_id(state->query_id()) << 
"hash join sink "
-                           << node_id() << " sink eos, set_ready_to_read"
-                           << ", task id: " << state->task_id() << ", nonspill 
build usage: "
-                           << _inner_sink_operator->get_memory_usage(
-                                      
local_state._shared_state->inner_runtime_state.get());
+                VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " 
task "
+                           << state->task_id() << " sink " << node_id() << " 
eos, set_ready_to_read"
+                           << ", nonspill build usage: "
+                           << 
PrettyPrinter::print_bytes(_inner_sink_operator->get_memory_usage(
+                                      
local_state._shared_state->inner_runtime_state.get()));
             }
 
             
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
@@ -646,9 +646,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
         if (eos) {
-            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash 
join sink "
-                       << node_id() << " sink eos, set_ready_to_read"
-                       << ", task id: " << state->task_id() << ", nonspill 
build usage: "
+            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " task " 
<< state->task_id()
+                       << " sink " << node_id() << " eos, set_ready_to_read"
+                       << ", nonspill build usage: "
                        << _inner_sink_operator->get_memory_usage(
                                   
local_state._shared_state->inner_runtime_state.get());
             local_state._dependency->set_ready_to_read();
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 0ae87287d96..a099ac66421 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -421,13 +421,19 @@ Status PipelineTask::execute(bool* eos) {
                 COUNTER_UPDATE(_memory_reserve_times, 1);
                 if (!st.ok()) {
                     COUNTER_UPDATE(_memory_reserve_failed_times, 1);
-                    VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to 
reserve: "
-                               << PrettyPrinter::print(reserve_size, 
TUnit::BYTES)
-                               << ", sink name: " << _sink->get_name()
-                               << ", node id: " << _sink->node_id()
-                               << ", task id: " << _state->task_id()
-                               << ", failed: " << st.to_string()
-                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    auto debug_msg = fmt::format(
+                            "Query: {} , try to reserve: {}, operator name: 
{}, operator id: {}, "
+                            "task id: "
+                            "{}, revocable mem size: {}, failed: {}",
+                            print_id(query_id), 
PrettyPrinter::print_bytes(reserve_size),
+                            _root->get_name(), _root->node_id(), 
_state->task_id(),
+                            PrettyPrinter::print_bytes(get_revocable_size()), 
st.to_string());
+                    // PROCESS_MEMORY_EXCEEDED error msg alread contains 
process_mem_log_str
+                    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+                        debug_msg += fmt::format(", debug info: {}",
+                                                 
GlobalMemoryArbitrator::process_mem_log_str());
+                    }
+                    LOG(INFO) << debug_msg;
 
                     _state->get_query_ctx()->update_paused_reason(st);
                     _state->get_query_ctx()->set_low_memory_mode();
@@ -453,13 +459,19 @@ Status PipelineTask::execute(bool* eos) {
 
                 if (!status.ok()) {
                     COUNTER_UPDATE(_memory_reserve_failed_times, 1);
-                    VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to 
reserve: "
-                               << PrettyPrinter::print(sink_reserve_size, 
TUnit::BYTES)
-                               << ", sink name: " << _sink->get_name()
-                               << ", node id: " << _sink->node_id()
-                               << ", task id: " << _state->task_id()
-                               << ", failed: " << status.to_string()
-                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    auto debug_msg = fmt::format(
+                            "Query: {} try to reserve: {}, sink name: {}, node 
id: {}, task id: "
+                            "{}, revocable mem size: {}, failed: {}",
+                            print_id(query_id), 
PrettyPrinter::print_bytes(sink_reserve_size),
+                            _sink->get_name(), _sink->node_id(), 
_state->task_id(),
+                            PrettyPrinter::print_bytes(get_revocable_size()), 
status.to_string());
+                    // PROCESS_MEMORY_EXCEEDED error msg alread contains 
process_mem_log_str
+                    if (!status.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+                        debug_msg += fmt::format(", debug info: {}",
+                                                 
GlobalMemoryArbitrator::process_mem_log_str());
+                    }
+                    LOG(INFO) << debug_msg;
+
                     DCHECK_EQ(_pending_block.get(), nullptr);
                     _pending_block = std::move(_block);
                     _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 0b55a1bf305..f69e8efe653 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -476,17 +476,17 @@ Status QueryContext::revoke_memory() {
     // should free 200MB memory, not 300MB
     const auto target_revoking_size = 
(int64_t)(query_mem_tracker->consumption() * 0.2);
     size_t revoked_size = 0;
+    size_t total_revokable_size = 0;
 
     std::vector<pipeline::PipelineTask*> chosen_tasks;
     for (auto&& [revocable_size, task] : tasks) {
-        chosen_tasks.emplace_back(task);
-
-        revoked_size += revocable_size;
         // Only revoke the largest task to ensure memory is used as much as 
possible
         // break;
-        if (revoked_size >= target_revoking_size) {
-            break;
+        if (revoked_size < target_revoking_size) {
+            chosen_tasks.emplace_back(task);
+            revoked_size += revocable_size;
         }
+        total_revokable_size += revocable_size;
     }
 
     std::weak_ptr<QueryContext> this_ctx = shared_from_this();
@@ -502,13 +502,14 @@ Status QueryContext::revoke_memory() {
                 query_context->set_memory_sufficient(true);
             });
 
+    LOG(INFO) << fmt::format(
+            "{}, spill context: {}, revokable mem: {}/{}, tasks count: {}/{}", 
this->debug_string(),
+            ((void*)spill_context.get()), 
PrettyPrinter::print_bytes(revoked_size),
+            PrettyPrinter::print_bytes(total_revokable_size), 
chosen_tasks.size(), tasks.size());
+
     for (auto* task : chosen_tasks) {
         RETURN_IF_ERROR(task->revoke_memory(spill_context));
     }
-
-    LOG(INFO) << this->debug_string() << ", context: " << 
((void*)spill_context.get())
-              << " total revoked size: " << PrettyPrinter::print(revoked_size, 
TUnit::BYTES)
-              << ", tasks count: " << chosen_tasks.size() << "/" << 
tasks.size();
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to