This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fa771514d51 [spill](logs) add logs to debug spill bugs (#37144) fa771514d51 is described below commit fa771514d51840d4a3da5b7ce700bf1309637875 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Thu Jul 4 11:18:53 2024 +0800 [spill](logs) add logs to debug spill bugs (#37144) Add logs to debug spill hash join bugs: ``` *** Query id: d7f1126be4e948c6-87f1a80ed3cbd69e *** *** is nereids: 0 *** *** tablet id: 0 *** *** Aborted at 1719291313 (unix time) try "date -d @1719291313" if you are using GNU date *** *** Current BE git commitID: 5f5262a885 *** *** SIGSEGV address not mapped to object (@0x8) received by PID 1419021 (TID 1421288 OR 0x7f0212b43640) from PID 8; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 2# JVM_handle_linux_signal in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 3# 0x00007F06BD506520 in /lib/x86_64-linux-gnu/libc.so.6 4# doris::vectorized::SpillReader::read(doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/spill/spill_reader.cpp:96 5# doris::vectorized::SpillStream::read_next_block_sync(doris::vectorized::Block*, bool*) in /mnt/disk1/STRESS_ENV/be/lib/doris_be 6# std::_Function_handler<void (), doris::pipeline::PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(doris::RuntimeState*, unsigned int, bool&)::$_1>::_M_invoke(std::_Any_data const&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291 7# doris::ThreadPool::dispatch_thread() in /mnt/disk1/STRESS_ENV/be/lib/doris_be 8# doris::Thread::supervise_thread(void*) at /home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:499 9# start_thread at ./nptl/pthread_create.c:442 10# 0x00007F06BD5EA850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83 ``` --- be/src/pipeline/dependency.cpp | 7 ++-- .../exec/partitioned_hash_join_probe_operator.cpp | 42 +++++++++++++++++++--- be/src/pipeline/pipeline_fragment_context.cpp | 1 + be/src/pipeline/pipeline_task.cpp | 19 ++++++---- be/src/runtime/runtime_state.h | 5 +++ 5 files changed, 59 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 4938883062a..5e1ce79a1eb 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -82,9 +82,10 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) { std::string Dependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, ready={}, _always_ready={}", - std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), - _ready, _always_ready); + fmt::format_to(debug_string_buffer, + "{}this={}, {}: id={}, block task = {}, ready={}, _always_ready={}", + std::string(indentation_level * 2, ' '), (void*)this, _name, _node_id, + _blocked_task.size(), _ready, _always_ready); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 1ff927bcc6d..09976b3060e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -258,6 +258,9 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + << ", task id: " << state->task_id() << ", partition: " << partition_index + << " recovery_build_blocks_from_disk"; auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { @@ -292,6 +295,9 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti SCOPED_TIMER(_recovery_build_timer); bool eos = false; + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + << ", task id: " << state->task_id() << ", partition: " << partition_index + << ", recoverying build data"; while (!eos) { vectorized::Block block; Status st; @@ -332,12 +338,12 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti } } - VLOG_DEBUG << "query: " << print_id(state->query_id()) - << ", recovery data done for partition: " << spilled_stream->get_spill_dir() - << ", task id: " << state->task_id(); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); shared_state_sptr->spilled_streams[partition_index].reset(); _dependency->set_ready(); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + << ", task id: " << state->task_id() << ", partition: " << partition_index + << ", recovery build data done"; }; auto exception_catch_func = [read_func, query_id, this]() { @@ -362,6 +368,16 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; _dependency->block(); + { + auto* pipeline_task = state->get_task(); + if (pipeline_task) { + auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>(); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << p.node_id() + << ", task id: " << state->task_id() << ", partition: " << partition_index + << ", dependency: " << _dependency + << ", task debug_string: " << pipeline_task->debug_string(); + } + } DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func", { @@ -371,15 +387,31 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti }); auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), exception_catch_func); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + << ", task id: " << state->task_id() << ", partition: " << partition_index + << " recovery_build_blocks_from_disk submit func"; return spill_io_pool->submit(std::move(spill_runnable)); } std::string PartitionedHashJoinProbeLocalState::debug_string(int indentation_level) const { + auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>(); + bool need_more_input_data; + if (_shared_state->need_to_spill) { + need_more_input_data = !_child_eos; + } else if (_runtime_state) { + need_more_input_data = p._inner_probe_operator->need_more_input_data(_runtime_state.get()); + } else { + need_more_input_data = true; + } fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}", + fmt::format_to(debug_string_buffer, + "{}, short_circuit_for_probe: {}, need_to_spill: {}, child_eos: {}, " + "_runtime_state: {}, need_more_input_data: {}", PipelineXSpillLocalState<PartitionedHashJoinSharedState>::debug_string( indentation_level), - _shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL"); + _shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL", + _shared_state->need_to_spill, _child_eos, _runtime_state != nullptr, + need_more_input_data); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 94837ff55a0..0968de7951e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -455,6 +455,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks( task_runtime_state.get(), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), i); + task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 09f32d9d23e..52951e1c9c0 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -254,6 +254,11 @@ bool PipelineTask::_is_blocked() { } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. if (!_operators[i]->need_more_input_data(_state)) { + if (VLOG_DEBUG_IS_ON) { + VLOG_DEBUG << "query: " << print_id(_state->query_id()) + << ", task id: " << _index << ", operator " << i + << " not need_more_input_data"; + } break; } } @@ -471,13 +476,13 @@ std::string PipelineTask::debug_string() { auto* cur_blocked_dep = _blocked_dep; auto elapsed = _fragment_context->elapsed_time() / 1000000000.0; - fmt::format_to( - debug_string_buffer, - "PipelineTask[this = {}, open = {}, eos = {}, finish = {}, dry run = {}, elapse time " - "= {}s], block dependency = {}, is running = {}\noperators: ", - (void*)this, _opened, _eos, _finalized, _dry_run, elapsed, - cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", - is_running()); + fmt::format_to(debug_string_buffer, + "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = " + "{}, elapse time " + "= {}s], block dependency = {}, is running = {}\noperators: ", + (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed, + cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", + is_running()); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", _opened && !_finalized ? _operators[i]->debug_string(_state, i) diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 49c051de44d..e89e7be66f5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -617,6 +617,10 @@ public: void set_task_id(int id) { _task_id = id; } + void set_task(pipeline::PipelineTask* task) { _task = task; } + + pipeline::PipelineTask* get_task() const { return _task; } + int task_id() const { return _task_id; } void set_task_num(int task_num) { _task_num = task_num; } @@ -721,6 +725,7 @@ private: std::vector<TTabletCommitInfo> _tablet_commit_infos; std::vector<TErrorTabletInfo> _error_tablet_infos; int _max_operator_id = 0; + pipeline::PipelineTask* _task = nullptr; int _task_id = -1; int _task_num = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org