This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa771514d51 [spill](logs) add logs to debug spill bugs (#37144)
fa771514d51 is described below

commit fa771514d51840d4a3da5b7ce700bf1309637875
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Thu Jul 4 11:18:53 2024 +0800

    [spill](logs) add logs to debug spill bugs (#37144)
    
    Add logs to debug spill hash join bugs:
    ```
    *** Query id: d7f1126be4e948c6-87f1a80ed3cbd69e ***
    *** is nereids: 0 ***
    *** tablet id: 0 ***
    *** Aborted at 1719291313 (unix time) try "date -d @1719291313" if you are 
using GNU date ***
    *** Current BE git commitID: 5f5262a885 ***
    *** SIGSEGV address not mapped to object (@0x8) received by PID 1419021 
(TID 1421288 OR 0x7f0212b43640) from PID 8; stack trace: ***
     0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at 
/home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421
     1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in 
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
     2# JVM_handle_linux_signal in 
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
     3# 0x00007F06BD506520 in /lib/x86_64-linux-gnu/libc.so.6
     4# doris::vectorized::SpillReader::read(doris::vectorized::Block*, bool*) 
at /home/zcp/repo_center/doris_master/doris/be/src/vec/spill/spill_reader.cpp:96
     5# 
doris::vectorized::SpillStream::read_next_block_sync(doris::vectorized::Block*, 
bool*) in /mnt/disk1/STRESS_ENV/be/lib/doris_be
     6# std::_Function_handler<void (), 
doris::pipeline::PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(doris::RuntimeState*,
 unsigned int, bool&)::$_1>::_M_invoke(std::_Any_data const&) at 
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291
     7# doris::ThreadPool::dispatch_thread() in 
/mnt/disk1/STRESS_ENV/be/lib/doris_be
     8# doris::Thread::supervise_thread(void*) at 
/home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:499
     9# start_thread at ./nptl/pthread_create.c:442
    10# 0x00007F06BD5EA850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83
    ```
---
 be/src/pipeline/dependency.cpp                     |  7 ++--
 .../exec/partitioned_hash_join_probe_operator.cpp  | 42 +++++++++++++++++++---
 be/src/pipeline/pipeline_fragment_context.cpp      |  1 +
 be/src/pipeline/pipeline_task.cpp                  | 19 ++++++----
 be/src/runtime/runtime_state.h                     |  5 +++
 5 files changed, 59 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 4938883062a..5e1ce79a1eb 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -82,9 +82,10 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) {
 
 std::string Dependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, 
ready={}, _always_ready={}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
-                   _ready, _always_ready);
+    fmt::format_to(debug_string_buffer,
+                   "{}this={}, {}: id={}, block task = {}, ready={}, 
_always_ready={}",
+                   std::string(indentation_level * 2, ' '), (void*)this, 
_name, _node_id,
+                   _blocked_task.size(), _ready, _always_ready);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 1ff927bcc6d..09976b3060e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -258,6 +258,9 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
 Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
+    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
+               << ", task id: " << state->task_id() << ", partition: " << 
partition_index
+               << " recovery_build_blocks_from_disk";
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -292,6 +295,9 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
         SCOPED_TIMER(_recovery_build_timer);
 
         bool eos = false;
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
+                   << ", task id: " << state->task_id() << ", partition: " << 
partition_index
+                   << ", recoverying build data";
         while (!eos) {
             vectorized::Block block;
             Status st;
@@ -332,12 +338,12 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             }
         }
 
-        VLOG_DEBUG << "query: " << print_id(state->query_id())
-                   << ", recovery data done for partition: " << 
spilled_stream->get_spill_dir()
-                   << ", task id: " << state->task_id();
         
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
         shared_state_sptr->spilled_streams[partition_index].reset();
         _dependency->set_ready();
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
+                   << ", task id: " << state->task_id() << ", partition: " << 
partition_index
+                   << ", recovery build data done";
     };
 
     auto exception_catch_func = [read_func, query_id, this]() {
@@ -362,6 +368,16 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     has_data = true;
     _dependency->block();
+    {
+        auto* pipeline_task = state->get_task();
+        if (pipeline_task) {
+            auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+            VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: 
" << p.node_id()
+                       << ", task id: " << state->task_id() << ", partition: " 
<< partition_index
+                       << ", dependency: " << _dependency
+                       << ", task debug_string: " << 
pipeline_task->debug_string();
+        }
+    }
 
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
                     {
@@ -371,15 +387,31 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                     });
     auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
                                                           
exception_catch_func);
+    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
+               << ", task id: " << state->task_id() << ", partition: " << 
partition_index
+               << " recovery_build_blocks_from_disk submit func";
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
 std::string PartitionedHashJoinProbeLocalState::debug_string(int 
indentation_level) const {
+    auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+    bool need_more_input_data;
+    if (_shared_state->need_to_spill) {
+        need_more_input_data = !_child_eos;
+    } else if (_runtime_state) {
+        need_more_input_data = 
p._inner_probe_operator->need_more_input_data(_runtime_state.get());
+    } else {
+        need_more_input_data = true;
+    }
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
+    fmt::format_to(debug_string_buffer,
+                   "{}, short_circuit_for_probe: {}, need_to_spill: {}, 
child_eos: {}, "
+                   "_runtime_state: {}, need_more_input_data: {}",
                    
PipelineXSpillLocalState<PartitionedHashJoinSharedState>::debug_string(
                            indentation_level),
-                   _shared_state ? 
std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
+                   _shared_state ? 
std::to_string(_shared_state->short_circuit_for_probe) : "NULL",
+                   _shared_state->need_to_spill, _child_eos, _runtime_state != 
nullptr,
+                   need_more_input_data);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 94837ff55a0..0968de7951e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -455,6 +455,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
                                                            
task_runtime_state.get(), this,
                                                            
pipeline_id_to_profile[pip_idx].get(),
                                                            
get_local_exchange_state(pipeline), i);
+                task_runtime_state->set_task(task.get());
                 pipeline_id_to_task.insert({pipeline->id(), task.get()});
                 _tasks[i].emplace_back(std::move(task));
             }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 09f32d9d23e..52951e1c9c0 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -254,6 +254,11 @@ bool PipelineTask::_is_blocked() {
             }
             // If all dependencies are ready for this operator, we can execute 
this task if no datum is needed from upstream operators.
             if (!_operators[i]->need_more_input_data(_state)) {
+                if (VLOG_DEBUG_IS_ON) {
+                    VLOG_DEBUG << "query: " << print_id(_state->query_id())
+                               << ", task id: " << _index << ", operator " << i
+                               << " not need_more_input_data";
+                }
                 break;
             }
         }
@@ -471,13 +476,13 @@ std::string PipelineTask::debug_string() {
 
     auto* cur_blocked_dep = _blocked_dep;
     auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
-    fmt::format_to(
-            debug_string_buffer,
-            "PipelineTask[this = {}, open = {}, eos = {}, finish = {}, dry run 
= {}, elapse time "
-            "= {}s], block dependency = {}, is running = {}\noperators: ",
-            (void*)this, _opened, _eos, _finalized, _dry_run, elapsed,
-            cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : 
"NULL",
-            is_running());
+    fmt::format_to(debug_string_buffer,
+                   "PipelineTask[this = {}, id = {}, open = {}, eos = {}, 
finish = {}, dry run = "
+                   "{}, elapse time "
+                   "= {}s], block dependency = {}, is running = {}\noperators: 
",
+                   (void*)this, _index, _opened, _eos, _finalized, _dry_run, 
elapsed,
+                   cur_blocked_dep && !_finalized ? 
cur_blocked_dep->debug_string() : "NULL",
+                   is_running());
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}",
                        _opened && !_finalized ? 
_operators[i]->debug_string(_state, i)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 49c051de44d..e89e7be66f5 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -617,6 +617,10 @@ public:
 
     void set_task_id(int id) { _task_id = id; }
 
+    void set_task(pipeline::PipelineTask* task) { _task = task; }
+
+    pipeline::PipelineTask* get_task() const { return _task; }
+
     int task_id() const { return _task_id; }
 
     void set_task_num(int task_num) { _task_num = task_num; }
@@ -721,6 +725,7 @@ private:
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
     std::vector<TErrorTabletInfo> _error_tablet_infos;
     int _max_operator_id = 0;
+    pipeline::PipelineTask* _task = nullptr;
     int _task_id = -1;
     int _task_num = 0;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to