This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new d46495ebf72 Add spill metrics and improve spill log printing (#46029)
d46495ebf72 is described below

commit d46495ebf72557fd3cecdffb1bd9870131acd60c
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Fri Dec 27 14:41:31 2024 +0800

    Add spill metrics and improve spill log printing (#46029)
---
 be/src/olap/memtable_flush_executor.cpp            |  3 +-
 .../exec/partitioned_aggregation_sink_operator.cpp | 25 +++---
 .../partitioned_aggregation_source_operator.cpp    | 32 +++----
 .../exec/partitioned_hash_join_probe_operator.cpp  | 98 +++++++++++-----------
 .../exec/partitioned_hash_join_sink_operator.cpp   | 50 +++++------
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 15 ++--
 .../pipeline/exec/spill_sort_source_operator.cpp   | 23 ++---
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  8 +-
 be/src/vec/spill/spill_reader.cpp                  | 11 ++-
 be/src/vec/spill/spill_stream_manager.cpp          | 21 +++++
 be/src/vec/spill/spill_stream_manager.h            | 22 +++++
 be/src/vec/spill/spill_writer.cpp                  |  2 +
 12 files changed, 186 insertions(+), 124 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 5533a360fac..9648c3fe098 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -161,7 +161,8 @@ Status FlushToken::_try_reserve_memory(QueryThreadContext 
query_thread_context,
         if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
             // If there are already any flushing task, Wait for some time and 
retry.
             LOG_EVERY_T(INFO, 60) << fmt::format(
-                    "Failed to reserve memory {} for flush memtable, retry 
after 100ms", size);
+                    "Failed to reserve memory {} for flush memtable, retry 
after 100ms",
+                    PrettyPrinter::print_bytes(size));
             std::this_thread::sleep_for(std::chrono::milliseconds(100));
         } else {
             st = Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 8cc6ae58a4f..ad7f4e6f184 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -186,9 +186,9 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
         revocable_size = revocable_mem_size(state);
         query_mem_limit = state->get_query_ctx()->get_mem_limit();
         LOG(INFO) << fmt::format(
-                "Query: {}, task {}, agg sink {} eos, need spill: {}, query 
mem limit: {}, "
-                "revocable memory: {}",
-                print_id(state->query_id()), state->task_id(), node_id(),
+                "Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem 
limit:{}, "
+                "revocable memory:{}",
+                print_id(state->query_id()), node_id(), state->task_id(),
                 local_state._shared_state->is_spilled, 
PrettyPrinter::print_bytes(query_mem_limit),
                 PrettyPrinter::print_bytes(revocable_size));
 
@@ -268,9 +268,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     const auto size_to_revoke = _parent->revocable_mem_size(state);
     LOG(INFO) << fmt::format(
-            "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need 
spill: {}, revocable "
-            "memory: {}",
-            print_id(state->query_id()), state->task_id(), _parent->node_id(), 
_eos,
+            "Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need 
spill:{}, revocable "
+            "memory:{}",
+            print_id(state->query_id()), _parent->node_id(), state->task_id(), 
_eos,
             _shared_state->is_spilled,
             PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
     if (!_shared_state->is_spilled) {
@@ -316,16 +316,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                 Defer defer {[&]() {
                     if (!status.ok() || state->is_cancelled()) {
                         if (!status.ok()) {
-                            LOG(WARNING) << "Query " << print_id(query_id) << 
" agg node "
-                                         << Base::_parent->node_id()
-                                         << " revoke_memory error: " << status;
+                            LOG(WARNING) << fmt::format(
+                                    "Query:{}, agg sink:{}, task:{}, 
revoke_memory error:{}",
+                                    print_id(query_id), 
Base::_parent->node_id(), state->task_id(),
+                                    status);
                         }
                         _shared_state->close();
                     } else {
                         LOG(INFO) << fmt::format(
-                                "Query: {}, task {}, agg sink {} revoke_memory 
finish, eos: {}, "
-                                "revocable memory: {}",
-                                print_id(state->query_id()), state->task_id(), 
_parent->node_id(),
+                                "Query:{}, agg sink:{}, task:{}, revoke_memory 
finish, eos:{}, "
+                                "revocable memory:{}",
+                                print_id(state->query_id()), 
_parent->node_id(), state->task_id(),
                                 _eos,
                                 
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
                     }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 8e221a1c7e2..c87ee24dedb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -250,8 +250,9 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
         Defer defer {[&]() {
             if (!status.ok() || state->is_cancelled()) {
                 if (!status.ok()) {
-                    LOG(WARNING) << "Query " << print_id(query_id) << " agg 
node "
-                                 << _parent->node_id() << " recover agg data 
error: " << status;
+                    LOG(WARNING) << fmt::format(
+                            "Query:{}, agg probe:{}, task:{}, recover agg data 
error:{}",
+                            print_id(query_id), _parent->node_id(), 
state->task_id(), status);
                 }
                 _shared_state->close();
             }
@@ -305,15 +306,16 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
             }
         }
 
-        VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << 
_parent->node_id()
-                   << ", task id: " << state->task_id() << " recover 
partitioned finished, "
-                   << _shared_state->spill_partitions.size() << " partitions 
left, "
-                   << accumulated_blocks_size
-                   << " bytes read, spill dep: " << 
(void*)(_spill_dependency.get());
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, agg probe:{}, task:{}, recover partitioned 
finished, partitions "
+                "left:{}, bytes read:{}, spill dep:{}",
+                print_id(query_id), _parent->node_id(), state->task_id(),
+                _shared_state->spill_partitions.size(), 
accumulated_blocks_size,
+                (void*)(_spill_dependency.get()));
         return status;
     };
 
-    auto exception_catch_func = [spill_func, query_id]() {
+    auto exception_catch_func = [this, state, spill_func, query_id]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
 {
             auto st = Status::InternalError(
                     "fault_inject partitioned_agg_source "
@@ -323,8 +325,9 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
         });
 
         auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
-        LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id)
-                                   << " recover exception : " << 
status.to_string();
+        LOG_IF(INFO, !status.ok()) << fmt::format(
+                "Query:{}, agg probe:{}, task:{}, recover exception:{}", 
print_id(query_id),
+                _parent->node_id(), state->task_id(), status.to_string());
         return status;
     };
 
@@ -334,10 +337,11 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
     });
     _spill_dependency->block();
 
-    VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << 
_parent->node_id()
-               << ", task id: " << state->task_id() << " begin to recover, "
-               << _shared_state->spill_partitions.size()
-               << " partitions left, _spill_dependency: " << 
(void*)(_spill_dependency.get());
+    VLOG_DEBUG << fmt::format(
+            "Query:{}, agg probe:{}, task:{}, begin to recover, partitions 
left:{}, "
+            "_spill_dependency:{}",
+            print_id(query_id), _parent->node_id(), state->task_id(),
+            _shared_state->spill_partitions.size(), 
(void*)(_spill_dependency.get()));
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
             std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, 
_runtime_profile.get(),
                                                    
_shared_state->shared_from_this(),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 04b83e822c1..ff9c78c5be4 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -230,9 +230,11 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         }
 
         COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
-        VLOG_DEBUG << "Query: " << print_id(query_id)
-                   << " hash probe revoke done, node: " << p.node_id()
-                   << ", task: " << state->task_id();
+
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, hash join probe:{}, task:{},"
+                " spill_probe_blocks done",
+                print_id(query_id), p.node_id(), state->task_id());
         return Status::OK();
     };
 
@@ -275,9 +277,10 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
 Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,
                                                                           
uint32_t partition_index,
                                                                           
bool& has_data) {
-    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
-               << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-               << " recover_build_blocks_from_disk";
+    VLOG_DEBUG << fmt::format(
+            "Query:{}, hash join probe:{}, task:{},"
+            " partition:{}, recover_build_blocks_from_disk",
+            print_id(state->query_id()), _parent->node_id(), state->task_id(), 
partition_index);
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -291,9 +294,10 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
         SCOPED_TIMER(_recovery_build_timer);
 
         bool eos = false;
-        VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
-                   << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-                   << ", recoverying build data";
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, hash join probe:{}, task:{},"
+                " partition:{}, recoverying build data",
+                print_id(state->query_id()), _parent->node_id(), 
state->task_id(), partition_index);
         Status status;
         while (!eos) {
             vectorized::Block block;
@@ -315,7 +319,11 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
             }
 
             if (UNLIKELY(state->is_cancelled())) {
-                LOG(INFO) << "recovery build block when canceled.";
+                LOG(INFO) << fmt::format(
+                        "Query:{}, hash join probe:{}, task:{},"
+                        " partition:{}, recovery build data canceled",
+                        print_id(state->query_id()), _parent->node_id(), 
state->task_id(),
+                        partition_index);
                 break;
             }
 
@@ -338,9 +346,11 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
         if (eos) {
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
             _shared_state->spilled_streams[partition_index].reset();
-            VLOG_DEBUG << "Query: " << print_id(state->query_id())
-                       << ", node: " << _parent->node_id() << ", task id: " << 
state->task_id()
-                       << ", partition: " << partition_index;
+            VLOG_DEBUG << fmt::format(
+                    "Query:{}, hash join probe:{}, task:{},"
+                    " partition:{}, recovery build data eos",
+                    print_id(state->query_id()), _parent->node_id(), 
state->task_id(),
+                    partition_index);
         }
         return status;
     };
@@ -365,16 +375,6 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     has_data = true;
     _spill_dependency->block();
-    {
-        auto* pipeline_task = state->get_task();
-        if (pipeline_task) {
-            auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
-            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: 
" << p.node_id()
-                       << ", task id: " << state->task_id() << ", partition: " 
<< partition_index
-                       << ", dependency: " << _dependency
-                       << ", task debug_string: " << 
pipeline_task->debug_string();
-        }
-    }
 
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
                     {
@@ -386,9 +386,6 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
     auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
             state, _spill_dependency, _runtime_profile.get(), 
_shared_state->shared_from_this(),
             exception_catch_func);
-    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
-               << ", task id: " << state->task_id() << ", partition: " << 
partition_index
-               << " recover_build_blocks_from_disk submit func";
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
@@ -429,7 +426,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
 
     auto query_id = state->query_id();
 
-    auto read_func = [this, query_id, &spilled_stream, &blocks] {
+    auto read_func = [this, query_id, partition_index, &spilled_stream, 
&blocks] {
         SCOPED_TIMER(_recovery_probe_timer);
 
         vectorized::Block block;
@@ -457,8 +454,10 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
             }
         }
         if (eos) {
-            VLOG_DEBUG << "Query: " << print_id(query_id)
-                       << ", recovery probe data done: " << 
spilled_stream->get_spill_dir();
+            VLOG_DEBUG << fmt::format(
+                    "Query:{}, hash join probe:{}, task:{},"
+                    " partition:{}, recovery probe data done",
+                    print_id(query_id), _parent->node_id(), _state->task_id(), 
partition_index);
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
             spilled_stream.reset();
         }
@@ -675,13 +674,13 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
 
     
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(),
                                                &block, true));
-    VLOG_DEBUG << "Query: " << print_id(state->query_id())
-               << ", internal build operator finished, node id: " << node_id()
-               << ", task id: " << state->task_id()
-               << ", partition: " << local_state._partition_cursor << "rows: " 
<< block.rows()
-               << ", usage: "
-               << _inner_sink_operator->get_memory_usage(
-                          
local_state._shared_state->inner_runtime_state.get());
+    VLOG_DEBUG << fmt::format(
+            "Query:{}, hash join probe:{}, task:{},"
+            " internal build operator finished, partition:{}, rows:{}, memory 
usage:{}",
+            print_id(state->query_id()), node_id(), state->task_id(), 
local_state._partition_cursor,
+            block.rows(),
+            _inner_sink_operator->get_memory_usage(
+                    local_state._shared_state->inner_runtime_state.get()));
 
     COUNTER_SET(local_state._hash_table_memory_usage,
                 
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
@@ -734,9 +733,10 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
             if (!has_data) {
                 vectorized::Block block;
                 RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, true));
-                VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", 
node: " << node_id()
-                           << ", task: " << state->task_id() << "partition: " 
<< partition_index
-                           << " has no data to recovery";
+                VLOG_DEBUG << fmt::format(
+                        "Query:{}, hash join probe:{}, task:{},"
+                        " partition:{}, has no data to recovery",
+                        print_id(state->query_id()), node_id(), 
state->task_id(), partition_index);
                 break;
             } else {
                 return Status::OK();
@@ -755,9 +755,11 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
 
     *eos = false;
     if (in_mem_eos) {
-        VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " 
<< node_id()
-                   << ", task: " << state->task_id()
-                   << ", partition: " << local_state._partition_cursor;
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, hash join probe:{}, task:{},"
+                " partition:{}, probe done",
+                print_id(state->query_id()), node_id(), state->task_id(),
+                local_state._partition_cursor);
         local_state._partition_cursor++;
         if (local_state._partition_cursor == _partition_count) {
             *eos = true;
@@ -848,8 +850,8 @@ size_t 
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
 
 Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
     auto& local_state = get_local_state(state);
-    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
-               << ", task: " << state->task_id();
+    VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, 
revoke_memory",
+                              print_id(state->query_id()), node_id(), 
state->task_id());
 
     RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
     return Status::OK();
@@ -894,10 +896,10 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
 #ifndef NDEBUG
     Defer eos_check_defer([&] {
         if (*eos) {
-            LOG(INFO) << "Query: " << print_id(state->query_id())
-                      << ", hash probe node: " << node_id() << ", task: " << 
state->task_id()
-                      << ", eos with child eos: " << local_state._child_eos
-                      << ", need spill: " << need_to_spill;
+            LOG(INFO) << fmt::format(
+                    "Query:{}, hash join probe:{}, task:{}, child eos:{}, need 
spill:{}",
+                    print_id(state->query_id()), node_id(), state->task_id(),
+                    local_state._child_eos, need_to_spill);
         }
     });
 #endif
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 2e2c38f04c3..a227d87aa1b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -178,8 +178,10 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
     }
 
     if (build_block.rows() <= 1) {
-        LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
-                     << ", task: " << state->task_id();
+        LOG(WARNING) << fmt::format(
+                "Query:{}, hash join sink:{}, task:{},"
+                " has no data to revoke",
+                print_id(state->query_id()), _parent->node_id(), 
state->task_id());
         if (spill_context) {
             spill_context->on_task_finished();
         }
@@ -270,9 +272,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                           });
             status = _finish_spilling();
             VLOG_DEBUG << fmt::format(
-                    "Query: {}, task {}, hash join sink {} 
_revoke_unpartitioned_block "
+                    "Query:{}, hash join sink:{}, task:{}, 
_revoke_unpartitioned_block, "
                     "set_ready_to_read",
-                    print_id(state->query_id()), state->task_id(), 
_parent->node_id());
+                    print_id(state->query_id()), _parent->node_id(), 
state->task_id());
             _dependency->set_ready_to_read();
         }
 
@@ -303,9 +305,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
 Status PartitionedHashJoinSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     SCOPED_TIMER(_spill_total_timer);
-    VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << 
state->task_id()
-               << " hash join sink " << _parent->node_id() << " revoke_memory"
-               << ", eos: " << _child_eos;
+    VLOG_DEBUG << fmt::format("Query:{}, hash join sink:{}, task:{}, 
revoke_memory, eos:{}",
+                              print_id(state->query_id()), _parent->node_id(), 
state->task_id(),
+                              _child_eos);
     CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
 
     if (!_shared_state->need_to_spill) {
@@ -322,9 +324,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     auto spill_fin_cb = [this, state, query_id, spill_context]() {
         Status status;
         if (_child_eos) {
-            LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ", 
task "
-                      << state->task_id() << " hash join sink " << 
_parent->node_id()
-                      << " finish spilling, set_ready_to_read";
+            LOG(INFO) << fmt::format(
+                    "Query:{}, hash join sink:{}, task:{}, finish spilling, 
set_ready_to_read",
+                    print_id(this->state()->query_id()), _parent->node_id(), 
state->task_id());
             std::for_each(_shared_state->partitioned_build_blocks.begin(),
                           _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
                               if (block) {
@@ -565,10 +567,9 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
         revocable_size = revocable_mem_size(state);
         query_mem_limit = state->get_query_ctx()->get_mem_limit();
         LOG(INFO) << fmt::format(
-                "Query: {}, task {}, hash join sink {} eos, need spill: {}, 
query mem limit: {}, "
-                "revocable "
-                "memory: {}",
-                print_id(state->query_id()), state->task_id(), node_id(), 
need_to_spill,
+                "Query:{}, hash join sink:{}, task:{}, eos, need spill:{}, 
query mem limit:{}, "
+                "revocable memory:{}",
+                print_id(state->query_id()), node_id(), state->task_id(), 
need_to_spill,
                 PrettyPrinter::print_bytes(query_mem_limit),
                 PrettyPrinter::print_bytes(revocable_size));
     }
@@ -590,9 +591,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
 
                 if (is_revocable_mem_high_watermark(state, revocable_size, 
query_mem_limit)) {
                     LOG(INFO) << fmt::format(
-                            "Query: {}, task {}, hash join sink {} eos, 
revoke_memory "
+                            "Query:{}, hash join sink:{}, task:{} eos, 
revoke_memory "
                             "because revocable memory is high",
-                            print_id(state->query_id()), state->task_id(), 
node_id());
+                            print_id(state->query_id()), node_id(), 
state->task_id());
                     return revoke_memory(state, nullptr);
                 }
 
@@ -601,10 +602,9 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
                 LOG(INFO) << fmt::format(
-                        "Query: {}, task {}, hash join sink {} eos, 
set_ready_to_read, nonspill "
-                        "memory "
-                        "usage: {}",
-                        print_id(state->query_id()), state->task_id(), 
node_id(),
+                        "Query:{}, hash join sink:{}, task:{}, eos, 
set_ready_to_read, nonspill "
+                        "memory usage:{}",
+                        print_id(state->query_id()), node_id(), 
state->task_id(),
                         _inner_sink_operator->get_memory_usage_debug_str(
                                 
local_state._shared_state->inner_runtime_state.get()));
             }
@@ -642,9 +642,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         if (eos) {
             if (is_revocable_mem_high_watermark(state, revocable_size, 
query_mem_limit)) {
                 LOG(INFO) << fmt::format(
-                        "Query: {}, task {}, hash join sink {} eos, 
revoke_memory "
+                        "Query:{}, hash join sink:{}, task:{}, eos, 
revoke_memory "
                         "because revocable memory is high",
-                        print_id(state->query_id()), state->task_id(), 
node_id());
+                        print_id(state->query_id()), node_id(), 
state->task_id());
                 return revoke_memory(state, nullptr);
             }
         }
@@ -653,9 +653,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         local_state.update_memory_usage();
         if (eos) {
             LOG(INFO) << fmt::format(
-                    "Query: {}, task {}, hash join sink {} eos, 
set_ready_to_read, nonspill memory "
-                    "usage: {}",
-                    print_id(state->query_id()), state->task_id(), node_id(),
+                    "Query:{}, hash join sink:{}, task:{}, eos, 
set_ready_to_read, nonspill memory "
+                    "usage:{}",
+                    print_id(state->query_id()), node_id(), state->task_id(),
                     _inner_sink_operator->get_memory_usage_debug_str(
                             
local_state._shared_state->inner_runtime_state.get()));
             local_state._dependency->set_ready_to_read();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 03c4072f7de..debe1d59710 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -195,9 +195,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
         profile()->add_info_string("Spilled", "true");
     }
 
-    VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node "
-               << Base::_parent->node_id() << " revoke_memory"
-               << ", eos: " << _eos;
+    VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke_memory, 
eos:{}",
+                              print_id(state->query_id()), _parent->node_id(), 
state->task_id(),
+                              _eos);
 
     auto status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
             state, _spilling_stream, print_id(state->query_id()), "sort", 
_parent->node_id(),
@@ -219,13 +219,14 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
         Defer defer {[&]() {
             if (!status.ok() || state->is_cancelled()) {
                 if (!status.ok()) {
-                    LOG(WARNING) << "Query " << print_id(query_id) << " sort 
node "
-                                 << _parent->node_id() << " revoke memory 
error: " << status;
+                    LOG(WARNING) << fmt::format(
+                            "Query:{}, sort sink:{}, task:{}, revoke memory 
error:{}",
+                            print_id(query_id), _parent->node_id(), 
state->task_id(), status);
                 }
                 _shared_state->close();
             } else {
-                VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " 
<< _parent->node_id()
-                           << " revoke memory finish";
+                VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, 
revoke memory finish",
+                                          print_id(query_id), 
_parent->node_id(), state->task_id());
             }
 
             if (!status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 8a58d0b1504..43bb8a65b6e 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -70,8 +70,8 @@ int 
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
     auto& parent = Base::_parent->template cast<Parent>();
-    VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
-               << " merge spill data";
+    VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill 
data",
+                              print_id(state->query_id()), _parent->node_id(), 
state->task_id());
     _spill_dependency->Dependency::block();
 
     auto query_id = state->query_id();
@@ -82,8 +82,9 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         Defer defer {[&]() {
             if (!status.ok() || state->is_cancelled()) {
                 if (!status.ok()) {
-                    LOG(WARNING) << "Query " << print_id(query_id) << " sort 
node "
-                                 << _parent->node_id() << " merge spill data 
error: " << status;
+                    LOG(WARNING) << fmt::format(
+                            "Query:{}, sort source:{}, task:{}, merge spill 
data error:{}",
+                            print_id(query_id), _parent->node_id(), 
state->task_id(), status);
                 }
                 _shared_state->close();
                 for (auto& stream : _current_merging_streams) {
@@ -91,18 +92,20 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 }
                 _current_merging_streams.clear();
             } else {
-                VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " 
<< _parent->node_id()
-                           << " merge spill data finish";
+                VLOG_DEBUG << fmt::format(
+                        "Query:{}, sort source:{}, task:{}, merge spill data 
finish",
+                        print_id(query_id), _parent->node_id(), 
state->task_id());
             }
         }};
         vectorized::Block merge_sorted_block;
         vectorized::SpillStreamSPtr tmp_stream;
         while (!state->is_cancelled()) {
             int max_stream_count = _calc_spill_blocks_to_merge(state);
-            VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << 
_parent->node_id()
-                       << " merge spill streams, streams count: "
-                       << _shared_state->sorted_streams.size()
-                       << ", curren merge max stream count: " << 
max_stream_count;
+            VLOG_DEBUG << fmt::format(
+                    "Query:{}, sort source:{}, task:{}, merge spill streams, 
streams count:{}, "
+                    "curren merge max stream count:{}",
+                    print_id(query_id), _parent->node_id(), state->task_id(),
+                    _shared_state->sorted_streams.size(), max_stream_count);
             {
                 SCOPED_TIMER(Base::_spill_recover_time);
                 status = _create_intermediate_merger(
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d8858dd5aba..e0fb08c43ba 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -381,10 +381,10 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
                 if (scan_task->cached_blocks.back().first->rows() > 0) {
-                    auto block_avg_bytes =
-                            
(scan_task->cached_blocks.back().first->allocated_bytes() +
-                             scan_task->cached_blocks.back().first->rows() - 
1) /
-                            scan_task->cached_blocks.back().first->rows() * 
ctx->batch_size();
+                    auto block_avg_bytes = 
(scan_task->cached_blocks.back().first->bytes() +
+                                            
scan_task->cached_blocks.back().first->rows() - 1) /
+                                           
scan_task->cached_blocks.back().first->rows() *
+                                           ctx->batch_size();
                     scanner->update_block_avg_bytes(block_avg_bytes);
                 }
                 if (ctx->low_memory_mode()) {
diff --git a/be/src/vec/spill/spill_reader.cpp 
b/be/src/vec/spill/spill_reader.cpp
index 014b83be23d..40323f824a8 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -29,6 +29,7 @@
 #include "runtime/exec_env.h"
 #include "util/slice.h"
 #include "vec/core/block.h"
+#include "vec/spill/spill_stream_manager.h"
 namespace doris {
 #include "common/compile_check_begin.h"
 namespace io {
@@ -52,11 +53,12 @@ Status SpillReader::open() {
 
     Slice result((char*)&block_count_, sizeof(size_t));
 
+    size_t total_read_bytes = 0;
     // read block count
     size_t bytes_read = 0;
     RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, 
&bytes_read));
     DCHECK(bytes_read == 8); // max_sub_block_size, block count
-    COUNTER_UPDATE(_read_file_size, bytes_read);
+    total_read_bytes += bytes_read;
     if (_query_statistics) {
         _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
     }
@@ -66,7 +68,7 @@ Status SpillReader::open() {
     result.data = (char*)&max_sub_block_size_;
     RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, 
result, &bytes_read));
     DCHECK(bytes_read == 8); // max_sub_block_size, block count
-    COUNTER_UPDATE(_read_file_size, bytes_read);
+    total_read_bytes += bytes_read;
     if (_query_statistics) {
         _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
     }
@@ -87,7 +89,9 @@ Status SpillReader::open() {
 
     RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
     DCHECK(bytes_read == block_count_ * sizeof(size_t));
-    COUNTER_UPDATE(_read_file_size, bytes_read);
+    total_read_bytes += bytes_read;
+    COUNTER_UPDATE(_read_file_size, total_read_bytes);
+    
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(total_read_bytes);
     if (_query_statistics) {
         _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
     }
@@ -134,6 +138,7 @@ Status SpillReader::read(Block* block, bool* eos) {
 
     if (bytes_read > 0) {
         COUNTER_UPDATE(_read_file_size, bytes_read);
+        
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(bytes_read);
         if (_query_statistics) {
             
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
         }
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 07a947b5ef3..833c5471fca 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -43,6 +43,9 @@
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
+SpillStreamManager::~SpillStreamManager() {
+    DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
+}
 SpillStreamManager::SpillStreamManager(
         std::unordered_map<std::string, 
std::unique_ptr<vectorized::SpillDataDir>>&&
                 spill_store_map)
@@ -84,9 +87,27 @@ Status SpillStreamManager::init() {
             "Spill", "spill_gc_thread", [this]() { 
this->_spill_gc_thread_callback(); },
             &_spill_gc_thread));
     LOG(INFO) << "spill gc thread started";
+
+    _init_metrics();
+
     return Status::OK();
 }
 
+void SpillStreamManager::_init_metrics() {
+    _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("spill",
+                                                                           
{{"name", "spill"}});
+
+    _spill_write_bytes_metric = std::make_unique<doris::MetricPrototype>(
+            doris::MetricType::COUNTER, doris::MetricUnit::BYTES, 
"spill_write_bytes");
+    _spill_write_bytes_counter = 
(IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
+            _spill_write_bytes_metric.get()));
+
+    _spill_read_bytes_metric = std::make_unique<doris::MetricPrototype>(
+            doris::MetricType::COUNTER, doris::MetricUnit::BYTES, 
"spill_read_bytes");
+    _spill_read_bytes_counter = 
(IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
+            _spill_read_bytes_metric.get()));
+}
+
 // clean up stale spilled files
 void SpillStreamManager::_spill_gc_thread_callback() {
     while (!_stop_background_threads_latch.wait_for(
diff --git a/be/src/vec/spill/spill_stream_manager.h 
b/be/src/vec/spill/spill_stream_manager.h
index 7bcfe950097..53ae89e9111 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -30,6 +30,14 @@
 namespace doris {
 #include "common/compile_check_begin.h"
 class RuntimeProfile;
+template <typename T>
+class AtomicCounter;
+using IntAtomicCounter = AtomicCounter<int64_t>;
+template <typename T>
+class AtomicGauge;
+using UIntGauge = AtomicGauge<uint64_t>;
+class MetricEntity;
+struct MetricPrototype;
 
 namespace vectorized {
 
@@ -106,6 +114,7 @@ private:
 };
 class SpillStreamManager {
 public:
+    ~SpillStreamManager();
     SpillStreamManager(std::unordered_map<std::string, 
std::unique_ptr<vectorized::SpillDataDir>>&&
                                spill_store_map);
 
@@ -133,7 +142,12 @@ public:
 
     ThreadPool* get_spill_io_thread_pool() const { return 
_spill_io_thread_pool.get(); }
 
+    void update_spill_write_bytes(int64_t bytes) { 
_spill_write_bytes_counter->increment(bytes); }
+
+    void update_spill_read_bytes(int64_t bytes) { 
_spill_read_bytes_counter->increment(bytes); }
+
 private:
+    void _init_metrics();
     Status _init_spill_store_map();
     void _spill_gc_thread_callback();
     std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type 
storage_medium);
@@ -145,6 +159,14 @@ private:
     scoped_refptr<Thread> _spill_gc_thread;
 
     std::atomic_uint64_t id_ = 0;
+
+    std::shared_ptr<MetricEntity> _entity {nullptr};
+
+    std::unique_ptr<doris::MetricPrototype> _spill_write_bytes_metric 
{nullptr};
+    std::unique_ptr<doris::MetricPrototype> _spill_read_bytes_metric {nullptr};
+
+    IntAtomicCounter* _spill_write_bytes_counter {nullptr};
+    IntAtomicCounter* _spill_read_bytes_counter {nullptr};
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index 5cff9042103..3a576004091 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -59,6 +59,7 @@ Status SpillWriter::close() {
         COUNTER_UPDATE(_write_file_current_size, meta_.size());
     }
     data_dir_->update_spill_data_usage(meta_.size());
+    
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(meta_.size());
 
     RETURN_IF_ERROR(file_writer_->close());
 
@@ -143,6 +144,7 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
             Defer defer {[&]() {
                 if (status.ok()) {
                     data_dir_->update_spill_data_usage(buff_size);
+                    
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(buff_size);
 
                     written_bytes += buff_size;
                     max_sub_block_size_ = std::max(max_sub_block_size_, 
(size_t)buff_size);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to